1use std::{
14 collections::HashMap,
15 fmt, str,
16 time::{SystemTime, UNIX_EPOCH},
17};
18
19use crate::{
20 error::Error,
21 partition::BoundPartitionField,
22 spec::{
23 partition::PartitionSpec,
24 sort::{self, SortOrder},
25 },
26};
27
28use serde::{Deserialize, Serialize};
29use serde_repr::{Deserialize_repr, Serialize_repr};
30use uuid::Uuid;
31
32use derive_builder::Builder;
33
34use super::{
35 schema::Schema,
36 snapshot::{Snapshot, SnapshotReference},
37 tabular::TabularMetadataRef,
38};
39
40pub static MAIN_BRANCH: &str = "main";
41static DEFAULT_SORT_ORDER_ID: i32 = 0;
42static DEFAULT_SPEC_ID: i32 = 0;
43
44pub const WRITE_PARQUET_COMPRESSION_CODEC: &str = "write.parquet.compression-codec";
47pub const WRITE_PARQUET_COMPRESSION_LEVEL: &str = "write.parquet.compression-level";
48pub const WRITE_OBJECT_STORAGE_ENABLED: &str = "write.object-storage.enabled";
49pub const WRITE_DATA_PATH: &str = "write.data.path";
50
51pub use _serde::{TableMetadataV1, TableMetadataV2};
52
53use _serde::TableMetadataEnum;
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
56#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
57pub struct TableMetadata {
59 #[builder(default)]
60 pub format_version: FormatVersion,
62 #[builder(default = "Uuid::new_v4()")]
63 pub table_uuid: Uuid,
65 #[builder(setter(into))]
66 pub location: String,
68 #[builder(default)]
69 pub last_sequence_number: i64,
71 #[builder(
72 default = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64"
73 )]
74 pub last_updated_ms: i64,
76 #[builder(default)]
77 pub last_column_id: i32,
79 #[builder(setter(each(name = "with_schema")))]
80 pub schemas: HashMap<i32, Schema>,
82 pub current_schema_id: i32,
84 #[builder(
85 setter(each(name = "with_partition_spec")),
86 default = "HashMap::from_iter(vec![(0,PartitionSpec::default())])"
87 )]
88 pub partition_specs: HashMap<i32, PartitionSpec>,
90 #[builder(default)]
91 pub default_spec_id: i32,
93 #[builder(default)]
94 pub last_partition_id: i32,
96 #[builder(default)]
100 pub properties: HashMap<String, String>,
101 #[builder(default)]
104 pub current_snapshot_id: Option<i64>,
105 #[builder(default)]
110 pub snapshots: HashMap<i64, Snapshot>,
111 #[builder(default)]
118 pub snapshot_log: Vec<SnapshotLog>,
119
120 #[builder(default)]
127 pub metadata_log: Vec<MetadataLog>,
128 #[builder(
129 setter(each(name = "with_sort_order")),
130 default = "HashMap::from_iter(vec![(0, SortOrder::default())])"
131 )]
132 pub sort_orders: HashMap<i32, sort::SortOrder>,
134 #[builder(default)]
135 pub default_sort_order_id: i32,
139 #[builder(default)]
144 pub refs: HashMap<String, SnapshotReference>,
145}
146
147impl TableMetadata {
148 #[inline]
156 pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
157 let schema_id = self
158 .current_snapshot(branch)?
159 .and_then(|x| *x.schema_id())
160 .unwrap_or(self.current_schema_id);
161 self.schemas
162 .get(&schema_id)
163 .ok_or_else(|| Error::InvalidFormat("schema".to_string()))
164 }
165
166 #[inline]
174 pub fn schema(&self, snapshot_id: i64) -> Result<&Schema, Error> {
175 let schema_id = self
176 .snapshots
177 .get(&snapshot_id)
178 .and_then(|x| *x.schema_id())
179 .unwrap_or(self.current_schema_id);
180 self.schemas
181 .get(&schema_id)
182 .ok_or_else(|| Error::InvalidFormat("schema".to_string()))
183 }
184
185 #[inline]
190 pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
191 self.partition_specs
192 .get(&self.default_spec_id)
193 .ok_or_else(|| Error::InvalidFormat("partition spec".to_string()))
194 }
195
196 pub fn current_partition_fields(
205 &self,
206 branch: Option<&str>,
207 ) -> Result<Vec<BoundPartitionField<'_>>, Error> {
208 let schema = self.current_schema(branch)?;
209 let partition_spec = self.default_partition_spec()?;
210 partition_fields(partition_spec, schema)
211 }
212
213 pub fn partition_fields(
222 &self,
223 snapshot_id: i64,
224 ) -> Result<Vec<BoundPartitionField<'_>>, Error> {
225 let schema = self.schema(snapshot_id)?;
226 self.default_partition_spec()?
227 .fields()
228 .iter()
229 .map(|partition_field| {
230 let field =
231 schema
232 .get(*partition_field.source_id() as usize)
233 .ok_or(Error::NotFound(format!(
234 "Schema field with id {}",
235 partition_field.source_id()
236 )))?;
237 Ok(BoundPartitionField::new(partition_field, field))
238 })
239 .collect()
240 }
241
242 #[inline]
251 pub fn current_snapshot(&self, snapshot_ref: Option<&str>) -> Result<Option<&Snapshot>, Error> {
252 let snapshot_id = match snapshot_ref {
253 None => self
254 .refs
255 .get("main")
256 .map(|x| x.snapshot_id)
257 .or(self.current_snapshot_id),
258 Some(reference) => self.refs.get(reference).map(|x| x.snapshot_id),
259 };
260 match snapshot_id {
261 Some(snapshot_id) => Ok(self.snapshots.get(&snapshot_id)),
262 None => {
263 if self.snapshots.is_empty()
264 || (snapshot_ref.is_some() && snapshot_ref != Some("main"))
265 {
266 Ok(None)
267 } else {
268 Err(Error::InvalidFormat("snapshots".to_string()))
269 }
270 }
271 }
272 }
273
274 #[inline]
283 pub fn current_snapshot_mut(
284 &mut self,
285 snapshot_ref: Option<String>,
286 ) -> Result<Option<&mut Snapshot>, Error> {
287 let snapshot_id = match &snapshot_ref {
288 None => self
289 .refs
290 .get("main")
291 .map(|x| x.snapshot_id)
292 .or(self.current_snapshot_id),
293 Some(reference) => self.refs.get(reference).map(|x| x.snapshot_id),
294 };
295 match snapshot_id {
296 Some(-1) => {
297 if self.snapshots.is_empty()
298 || (snapshot_ref.is_some() && snapshot_ref.as_deref() != Some("main"))
299 {
300 Ok(None)
301 } else {
302 Err(Error::InvalidFormat("snapshots".to_string()))
303 }
304 }
305 Some(snapshot_id) => Ok(self.snapshots.get_mut(&snapshot_id)),
306 None => {
307 if self.snapshots.is_empty()
308 || (snapshot_ref.is_some() && snapshot_ref.as_deref() != Some("main"))
309 {
310 Ok(None)
311 } else {
312 Err(Error::InvalidFormat("snapshots".to_string()))
313 }
314 }
315 }
316 }
317
318 pub fn sequence_number(&self, snapshot_id: i64) -> Option<i64> {
326 self.snapshots
327 .get(&snapshot_id)
328 .map(|x| *x.sequence_number())
329 }
330
331 pub fn as_ref(&self) -> TabularMetadataRef<'_> {
332 TabularMetadataRef::Table(self)
333 }
334}
335
336pub fn partition_fields<'a>(
337 partition_spec: &'a PartitionSpec,
338 schema: &'a Schema,
339) -> Result<Vec<BoundPartitionField<'a>>, Error> {
340 partition_spec
341 .fields()
342 .iter()
343 .map(|partition_field| {
344 let field =
345 schema
346 .get(*partition_field.source_id() as usize)
347 .ok_or(Error::NotFound(format!(
348 "Schema field with id {}",
349 partition_field.source_id()
350 )))?;
351 Ok(BoundPartitionField::new(partition_field, field))
352 })
353 .collect()
354}
355
356pub fn new_metadata_location<'a, T: Into<TabularMetadataRef<'a>>>(metadata: T) -> String {
364 let metadata: TabularMetadataRef = metadata.into();
365 let transaction_uuid = Uuid::new_v4();
366 let version = metadata.sequence_number();
367
368 format!(
369 "{}/metadata/{:05}-{}.metadata.json",
370 metadata.location(),
371 version,
372 transaction_uuid
373 )
374}
375
376impl fmt::Display for TableMetadata {
377 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
378 write!(
379 f,
380 "{}",
381 &serde_json::to_string(self).map_err(|_| fmt::Error)?,
382 )
383 }
384}
385
386impl str::FromStr for TableMetadata {
387 type Err = Error;
388 fn from_str(s: &str) -> Result<Self, Self::Err> {
389 serde_json::from_str(s).map_err(Error::from)
390 }
391}
392
393pub mod _serde {
394 use std::collections::HashMap;
395
396 use itertools::Itertools;
397 use serde::{Deserialize, Serialize};
398 use uuid::Uuid;
399
400 use crate::{
401 error::Error,
402 spec::{
403 partition::{PartitionField, PartitionSpec},
404 schema,
405 snapshot::{
406 SnapshotReference, SnapshotRetention,
407 _serde::{SnapshotV1, SnapshotV2},
408 },
409 sort,
410 },
411 };
412
413 use super::{
414 FormatVersion, MetadataLog, SnapshotLog, TableMetadata, VersionNumber,
415 DEFAULT_SORT_ORDER_ID, DEFAULT_SPEC_ID, MAIN_BRANCH,
416 };
417
418 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
420 #[serde(untagged)]
421 pub(super) enum TableMetadataEnum {
422 V2(TableMetadataV2),
424 V1(TableMetadataV1),
426 }
427
428 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
429 #[serde(rename_all = "kebab-case")]
430 pub struct TableMetadataV2 {
432 pub format_version: VersionNumber<2>,
434 pub table_uuid: Uuid,
436 pub location: String,
438 pub last_sequence_number: i64,
440 pub last_updated_ms: i64,
442 pub last_column_id: i32,
444 pub schemas: Vec<schema::SchemaV2>,
446 pub current_schema_id: i32,
448 pub partition_specs: Vec<PartitionSpec>,
450 pub default_spec_id: i32,
452 pub last_partition_id: i32,
454 #[serde(skip_serializing_if = "HashMap::is_empty", default)]
458 pub properties: HashMap<String, String>,
459 #[serde(skip_serializing_if = "Option::is_none")]
462 pub current_snapshot_id: Option<i64>,
463 #[serde(skip_serializing_if = "Option::is_none")]
468 pub snapshots: Option<Vec<SnapshotV2>>,
469 #[serde(skip_serializing_if = "Vec::is_empty", default)]
476 pub snapshot_log: Vec<SnapshotLog>,
477
478 #[serde(skip_serializing_if = "Vec::is_empty", default)]
485 pub metadata_log: Vec<MetadataLog>,
486
487 pub sort_orders: Vec<sort::SortOrder>,
489 pub default_sort_order_id: i32,
493 #[serde(skip_serializing_if = "HashMap::is_empty", default)]
498 pub refs: HashMap<String, SnapshotReference>,
499 }
500
501 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
502 #[serde(rename_all = "kebab-case")]
503 pub struct TableMetadataV1 {
505 pub format_version: VersionNumber<1>,
507 #[serde(skip_serializing_if = "Option::is_none")]
509 pub table_uuid: Option<Uuid>,
510 pub location: String,
512 pub last_updated_ms: i64,
514 pub last_column_id: i32,
516 pub schema: schema::SchemaV1,
518 #[serde(skip_serializing_if = "Option::is_none")]
520 pub schemas: Option<Vec<schema::SchemaV1>>,
521 #[serde(skip_serializing_if = "Option::is_none")]
523 pub current_schema_id: Option<i32>,
524 pub partition_spec: Vec<PartitionField>,
527 #[serde(skip_serializing_if = "Option::is_none")]
529 pub partition_specs: Option<Vec<PartitionSpec>>,
530 #[serde(skip_serializing_if = "Option::is_none")]
532 pub default_spec_id: Option<i32>,
533 #[serde(skip_serializing_if = "Option::is_none")]
535 pub last_partition_id: Option<i32>,
536 #[serde(skip_serializing_if = "HashMap::is_empty", default)]
540 pub properties: HashMap<String, String>,
541 #[serde(skip_serializing_if = "Option::is_none")]
544 pub current_snapshot_id: Option<i64>,
545 #[serde(skip_serializing_if = "Option::is_none")]
550 pub snapshots: Option<Vec<SnapshotV1>>,
551 #[serde(skip_serializing_if = "Vec::is_empty", default)]
558 pub snapshot_log: Vec<SnapshotLog>,
559
560 #[serde(skip_serializing_if = "Vec::is_empty", default)]
567 pub metadata_log: Vec<MetadataLog>,
568
569 pub sort_orders: Option<Vec<sort::SortOrder>>,
571 pub default_sort_order_id: Option<i32>,
575 }
576
577 impl TryFrom<TableMetadataEnum> for TableMetadata {
578 type Error = Error;
579 fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
580 match value {
581 TableMetadataEnum::V2(value) => value.try_into(),
582 TableMetadataEnum::V1(value) => value.try_into(),
583 }
584 }
585 }
586
587 impl From<TableMetadata> for TableMetadataEnum {
588 fn from(value: TableMetadata) -> Self {
589 match value.format_version {
590 FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
591 FormatVersion::V1 => TableMetadataEnum::V1(value.into()),
592 }
593 }
594 }
595
596 impl TryFrom<TableMetadataV2> for TableMetadata {
597 type Error = Error;
598 fn try_from(value: TableMetadataV2) -> Result<Self, Error> {
599 let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
600 None
601 } else {
602 value.current_snapshot_id
603 };
604 let schemas = HashMap::from_iter(
605 value
606 .schemas
607 .into_iter()
608 .map(|schema| Ok((schema.schema_id, schema.try_into()?)))
609 .collect::<Result<Vec<_>, Error>>()?,
610 );
611 let mut refs = value.refs;
612 if let Some(snapshot_id) = current_snapshot_id {
613 refs.entry(MAIN_BRANCH.to_string())
614 .or_insert(SnapshotReference {
615 snapshot_id,
616 retention: SnapshotRetention::default(),
617 });
618 }
619 Ok(TableMetadata {
620 format_version: FormatVersion::V2,
621 table_uuid: value.table_uuid,
622 location: value.location,
623 last_sequence_number: value.last_sequence_number,
624 last_updated_ms: value.last_updated_ms,
625 last_column_id: value.last_column_id,
626 current_schema_id: if schemas.keys().contains(&value.current_schema_id) {
627 Ok(value.current_schema_id)
628 } else {
629 Err(Error::InvalidFormat("schema".to_string()))
630 }?,
631 schemas,
632 partition_specs: HashMap::from_iter(
633 value.partition_specs.into_iter().map(|x| (*x.spec_id(), x)),
634 ),
635 default_spec_id: value.default_spec_id,
636 last_partition_id: value.last_partition_id,
637 properties: value.properties,
638 current_snapshot_id,
639 snapshots: value
640 .snapshots
641 .map(|snapshots| {
642 HashMap::from_iter(snapshots.into_iter().map(|x| (x.snapshot_id, x.into())))
643 })
644 .unwrap_or_default(),
645 snapshot_log: value.snapshot_log,
646 metadata_log: value.metadata_log,
647 sort_orders: HashMap::from_iter(
648 value.sort_orders.into_iter().map(|x| (x.order_id, x)),
649 ),
650 default_sort_order_id: value.default_sort_order_id,
651 refs,
652 })
653 }
654 }
655
656 impl TryFrom<TableMetadataV1> for TableMetadata {
657 type Error = Error;
658 fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
659 let schemas = value
660 .schemas
661 .map(|schemas| {
662 Ok::<_, Error>(HashMap::from_iter(
663 schemas
664 .into_iter()
665 .enumerate()
666 .map(|(i, schema)| {
667 Ok((schema.schema_id.unwrap_or(i as i32), schema.try_into()?))
668 })
669 .collect::<Result<Vec<_>, Error>>()?
670 .into_iter(),
671 ))
672 })
673 .or_else(|| {
674 Some(Ok(HashMap::from_iter(vec![(
675 value.schema.schema_id.unwrap_or(0),
676 value.schema.try_into().ok()?,
677 )])))
678 })
679 .transpose()?
680 .unwrap_or_default();
681 let partition_specs = HashMap::from_iter(
682 value
683 .partition_specs
684 .unwrap_or_else(|| {
685 vec![PartitionSpec::builder()
686 .with_spec_id(DEFAULT_SPEC_ID)
687 .with_fields(value.partition_spec)
688 .build()
689 .unwrap()]
690 })
691 .into_iter()
692 .map(|x| (*x.spec_id(), x)),
693 );
694 Ok(TableMetadata {
695 format_version: FormatVersion::V1,
696 table_uuid: value.table_uuid.unwrap_or_default(),
697 location: value.location,
698 last_sequence_number: 0,
699 last_updated_ms: value.last_updated_ms,
700 last_column_id: value.last_column_id,
701 current_schema_id: value
702 .current_schema_id
703 .unwrap_or_else(|| schemas.keys().copied().max().unwrap_or_default()),
704 default_spec_id: value
705 .default_spec_id
706 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
707 last_partition_id: value
708 .last_partition_id
709 .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
710 partition_specs,
711 schemas,
712
713 properties: value.properties,
714 current_snapshot_id: if let &Some(id) = &value.current_snapshot_id {
715 if id == -1 {
716 None
717 } else {
718 Some(id)
719 }
720 } else {
721 value.current_snapshot_id
722 },
723 snapshots: value
724 .snapshots
725 .map(|snapshots| {
726 Ok::<_, Error>(HashMap::from_iter(
727 snapshots
728 .into_iter()
729 .map(|x| Ok((x.snapshot_id, x.into())))
730 .collect::<Result<Vec<_>, Error>>()?,
731 ))
732 })
733 .transpose()?
734 .unwrap_or_default(),
735 snapshot_log: value.snapshot_log,
736 metadata_log: value.metadata_log,
737 sort_orders: match value.sort_orders {
738 Some(sort_orders) => {
739 HashMap::from_iter(sort_orders.into_iter().map(|x| (x.order_id, x)))
740 }
741 None => HashMap::new(),
742 },
743 default_sort_order_id: value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID),
744 refs: HashMap::from_iter(vec![(
745 MAIN_BRANCH.to_string(),
746 SnapshotReference {
747 snapshot_id: value.current_snapshot_id.unwrap_or_default(),
748 retention: SnapshotRetention::Branch {
749 min_snapshots_to_keep: None,
750 max_snapshot_age_ms: None,
751 max_ref_age_ms: None,
752 },
753 },
754 )]),
755 })
756 }
757 }
758
759 impl From<TableMetadata> for TableMetadataV2 {
760 fn from(v: TableMetadata) -> Self {
761 TableMetadataV2 {
762 format_version: VersionNumber::<2>,
763 table_uuid: v.table_uuid,
764 location: v.location,
765 last_sequence_number: v.last_sequence_number,
766 last_updated_ms: v.last_updated_ms,
767 last_column_id: v.last_column_id,
768 schemas: v.schemas.into_values().map(|x| x.into()).collect(),
769 current_schema_id: v.current_schema_id,
770 partition_specs: v.partition_specs.into_values().collect(),
771 default_spec_id: v.default_spec_id,
772 last_partition_id: v.last_partition_id,
773 properties: v.properties,
774 current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
775 snapshots: Some(v.snapshots.into_values().map(|x| x.into()).collect()),
776 snapshot_log: v.snapshot_log,
777 metadata_log: v.metadata_log,
778 sort_orders: v.sort_orders.into_values().collect(),
779 default_sort_order_id: v.default_sort_order_id,
780 refs: v.refs,
781 }
782 }
783 }
784
785 impl From<TableMetadata> for TableMetadataV1 {
786 fn from(v: TableMetadata) -> Self {
787 TableMetadataV1 {
788 format_version: VersionNumber::<1>,
789 table_uuid: Some(v.table_uuid),
790 location: v.location,
791 last_updated_ms: v.last_updated_ms,
792 last_column_id: v.last_column_id,
793 schema: v.schemas.get(&v.current_schema_id).unwrap().clone().into(),
794 schemas: Some(v.schemas.into_values().map(|x| x.into()).collect()),
795 current_schema_id: Some(v.current_schema_id),
796 partition_spec: v
797 .partition_specs
798 .get(&v.default_spec_id)
799 .map(|x| x.fields().clone())
800 .unwrap_or_default(),
801 partition_specs: Some(v.partition_specs.into_values().collect()),
802 default_spec_id: Some(v.default_spec_id),
803 last_partition_id: Some(v.last_partition_id),
804 properties: v.properties,
805 current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
806 snapshots: Some(v.snapshots.into_values().map(|x| x.into()).collect()),
807 snapshot_log: v.snapshot_log,
808 metadata_log: v.metadata_log,
809 sort_orders: Some(v.sort_orders.into_values().collect()),
810 default_sort_order_id: Some(v.default_sort_order_id),
811 }
812 }
813 }
814}
815
816#[derive(Debug, PartialEq, Eq, Clone)]
818pub struct VersionNumber<const V: u8>;
819
820impl<const V: u8> Serialize for VersionNumber<V> {
821 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
822 where
823 S: serde::Serializer,
824 {
825 serializer.serialize_u8(V)
826 }
827}
828
829impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
830 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
831 where
832 D: serde::Deserializer<'de>,
833 {
834 let value = u8::deserialize(deserializer)?;
835 if value == V {
836 Ok(VersionNumber::<V>)
837 } else {
838 Err(serde::de::Error::custom("Invalid Version"))
839 }
840 }
841}
842
843#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
844#[serde(rename_all = "kebab-case")]
845pub struct MetadataLog {
847 pub metadata_file: String,
849 pub timestamp_ms: i64,
851}
852
853#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
854#[serde(rename_all = "kebab-case")]
855pub struct SnapshotLog {
857 pub snapshot_id: i64,
859 pub timestamp_ms: i64,
861}
862
863#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
864#[repr(u8)]
865#[derive(Default)]
867pub enum FormatVersion {
868 V1 = b'1',
870 #[default]
872 V2 = b'2',
873}
874
875impl TryFrom<u8> for FormatVersion {
876 type Error = Error;
877 fn try_from(value: u8) -> Result<Self, Self::Error> {
878 match value {
879 1 => Ok(FormatVersion::V1),
880 2 => Ok(FormatVersion::V2),
881 _ => Err(Error::Conversion(
882 "u8".to_string(),
883 "format version".to_string(),
884 )),
885 }
886 }
887}
888
889impl From<FormatVersion> for u8 {
890 fn from(value: FormatVersion) -> Self {
891 match value {
892 FormatVersion::V1 => b'1',
893 FormatVersion::V2 => b'2',
894 }
895 }
896}
897
898impl From<FormatVersion> for i32 {
899 fn from(value: FormatVersion) -> Self {
900 match value {
901 FormatVersion::V1 => 1,
902 FormatVersion::V2 => 2,
903 }
904 }
905}
906
907#[cfg(test)]
908mod tests {
909
910 use std::{collections::HashMap, fs};
911
912 use uuid::Uuid;
913
914 use crate::{
915 error::Error,
916 spec::{
917 partition::{PartitionField, PartitionSpec, Transform},
918 schema::SchemaBuilder,
919 snapshot::{Operation, SnapshotBuilder, SnapshotReference, SnapshotRetention, Summary},
920 sort::{NullOrder, SortDirection, SortField, SortOrderBuilder},
921 table_metadata::TableMetadata,
922 types::{PrimitiveType, StructField, Type},
923 },
924 };
925
926 use super::{FormatVersion, SnapshotLog};
927
928 fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
929 let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
930 assert_eq!(desered_type, expected_type);
931
932 let sered_json = serde_json::to_string(&expected_type).unwrap();
933 let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
934
935 assert_eq!(parsed_json_value, desered_type);
936 }
937
938 #[test]
939 fn test_deserialize_table_data_v2() -> Result<(), Error> {
940 let data = r#"
941 {
942 "format-version" : 2,
943 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
944 "location": "s3://b/wh/data.db/table",
945 "last-sequence-number" : 1,
946 "last-updated-ms": 1515100955770,
947 "last-column-id": 1,
948 "schemas": [
949 {
950 "schema-id" : 1,
951 "type" : "struct",
952 "fields" :[
953 {
954 "id": 1,
955 "name": "struct_name",
956 "required": true,
957 "type": "fixed[1]"
958 }
959 ]
960 }
961 ],
962 "current-schema-id" : 1,
963 "partition-specs": [
964 {
965 "spec-id": 1,
966 "fields": [
967 {
968 "source-id": 4,
969 "field-id": 1000,
970 "name": "ts_day",
971 "transform": "day"
972 }
973 ]
974 }
975 ],
976 "default-spec-id": 1,
977 "last-partition-id": 1,
978 "properties": {
979 "commit.retry.num-retries": "1"
980 },
981 "metadata-log": [
982 {
983 "metadata-file": "s3://bucket/.../v1.json",
984 "timestamp-ms": 1515100
985 }
986 ],
987 "sort-orders": [],
988 "default-sort-order-id": 0
989 }
990 "#;
991 let metadata =
992 serde_json::from_str::<TableMetadata>(data).expect("Failed to deserialize json");
993 let metadata_two: TableMetadata = serde_json::from_str(
995 &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
996 )
997 .expect("Failed to serialize json");
998 assert_eq!(metadata, metadata_two);
999
1000 Ok(())
1001 }
1002
1003 #[test]
1004 fn test_deserialize_table_data_v1() -> Result<(), Error> {
1005 let data = r#"
1006 {
1007 "format-version" : 1,
1008 "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1009 "location" : "/home/iceberg/warehouse/nyc/taxis",
1010 "last-updated-ms" : 1662532818843,
1011 "last-column-id" : 5,
1012 "schema" : {
1013 "type" : "struct",
1014 "schema-id" : 0,
1015 "fields" : [ {
1016 "id" : 1,
1017 "name" : "vendor_id",
1018 "required" : false,
1019 "type" : "long"
1020 }, {
1021 "id" : 2,
1022 "name" : "trip_id",
1023 "required" : false,
1024 "type" : "long"
1025 }, {
1026 "id" : 3,
1027 "name" : "trip_distance",
1028 "required" : false,
1029 "type" : "float"
1030 }, {
1031 "id" : 4,
1032 "name" : "fare_amount",
1033 "required" : false,
1034 "type" : "double"
1035 }, {
1036 "id" : 5,
1037 "name" : "store_and_fwd_flag",
1038 "required" : false,
1039 "type" : "string"
1040 } ]
1041 },
1042 "current-schema-id" : 0,
1043 "schemas" : [ {
1044 "type" : "struct",
1045 "schema-id" : 0,
1046 "fields" : [ {
1047 "id" : 1,
1048 "name" : "vendor_id",
1049 "required" : false,
1050 "type" : "long"
1051 }, {
1052 "id" : 2,
1053 "name" : "trip_id",
1054 "required" : false,
1055 "type" : "long"
1056 }, {
1057 "id" : 3,
1058 "name" : "trip_distance",
1059 "required" : false,
1060 "type" : "float"
1061 }, {
1062 "id" : 4,
1063 "name" : "fare_amount",
1064 "required" : false,
1065 "type" : "double"
1066 }, {
1067 "id" : 5,
1068 "name" : "store_and_fwd_flag",
1069 "required" : false,
1070 "type" : "string"
1071 } ]
1072 } ],
1073 "partition-spec" : [ {
1074 "name" : "vendor_id",
1075 "transform" : "identity",
1076 "source-id" : 1,
1077 "field-id" : 1000
1078 } ],
1079 "default-spec-id" : 0,
1080 "partition-specs" : [ {
1081 "spec-id" : 0,
1082 "fields" : [ {
1083 "name" : "vendor_id",
1084 "transform" : "identity",
1085 "source-id" : 1,
1086 "field-id" : 1000
1087 } ]
1088 } ],
1089 "last-partition-id" : 1000,
1090 "default-sort-order-id" : 0,
1091 "sort-orders" : [ {
1092 "order-id" : 0,
1093 "fields" : [ ]
1094 } ],
1095 "properties" : {
1096 "owner" : "root"
1097 },
1098 "current-snapshot-id" : 638933773299822130,
1099 "refs" : {
1100 "main" : {
1101 "snapshot-id" : 638933773299822130,
1102 "type" : "branch"
1103 }
1104 },
1105 "snapshots" : [ {
1106 "snapshot-id" : 638933773299822130,
1107 "timestamp-ms" : 1662532818843,
1108 "summary" : {
1109 "operation" : "append",
1110 "spark.app.id" : "local-1662532784305",
1111 "added-data-files" : "4",
1112 "added-records" : "4",
1113 "added-files-size" : "6001",
1114 "changed-partition-count" : "2",
1115 "total-records" : "4",
1116 "total-files-size" : "6001",
1117 "total-data-files" : "4",
1118 "total-delete-files" : "0",
1119 "total-position-deletes" : "0",
1120 "total-equality-deletes" : "0"
1121 },
1122 "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1123 "schema-id" : 0
1124 } ],
1125 "snapshot-log" : [ {
1126 "timestamp-ms" : 1662532818843,
1127 "snapshot-id" : 638933773299822130
1128 } ],
1129 "metadata-log" : [ {
1130 "timestamp-ms" : 1662532805245,
1131 "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1132 } ]
1133 }
1134 "#;
1135 let metadata =
1136 serde_json::from_str::<TableMetadata>(data).expect("Failed to deserialize json");
1137 let metadata_two: TableMetadata = serde_json::from_str(
1139 &serde_json::to_string(&metadata).expect("Failed to serialize metadata"),
1140 )
1141 .expect("Failed to serialize json");
1142 assert_eq!(metadata, metadata_two);
1143
1144 Ok(())
1145 }
1146
1147 #[test]
1148 fn test_table_metadata_v2_file_valid() {
1149 let metadata =
1150 fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
1151
1152 let schema1 = SchemaBuilder::default()
1153 .with_schema_id(0)
1154 .with_struct_field(StructField {
1155 id: 1,
1156 name: "x".to_owned(),
1157 required: true,
1158 field_type: Type::Primitive(PrimitiveType::Long),
1159 doc: None,
1160 })
1161 .build()
1162 .unwrap();
1163
1164 let schema2 = SchemaBuilder::default()
1165 .with_schema_id(1)
1166 .with_struct_field(StructField {
1167 id: 1,
1168 name: "x".to_owned(),
1169 required: true,
1170 field_type: Type::Primitive(PrimitiveType::Long),
1171 doc: None,
1172 })
1173 .with_struct_field(StructField {
1174 id: 2,
1175 name: "y".to_owned(),
1176 required: true,
1177 field_type: Type::Primitive(PrimitiveType::Long),
1178 doc: Some("comment".to_owned()),
1179 })
1180 .with_struct_field(StructField {
1181 id: 3,
1182 name: "z".to_owned(),
1183 required: true,
1184 field_type: Type::Primitive(PrimitiveType::Long),
1185 doc: None,
1186 })
1187 .with_identifier_field_ids(vec![1, 2])
1188 .build()
1189 .unwrap();
1190
1191 let partition_spec = PartitionSpec::builder()
1192 .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1193 .build()
1194 .unwrap();
1195
1196 let sort_order = SortOrderBuilder::default()
1197 .with_order_id(3)
1198 .with_sort_field(SortField {
1199 source_id: 2,
1200 transform: Transform::Identity,
1201 direction: SortDirection::Ascending,
1202 null_order: NullOrder::First,
1203 })
1204 .with_sort_field(SortField {
1205 source_id: 3,
1206 transform: Transform::Bucket(4),
1207 direction: SortDirection::Descending,
1208 null_order: NullOrder::Last,
1209 })
1210 .build()
1211 .unwrap();
1212
1213 let snapshot1 = SnapshotBuilder::default()
1214 .with_snapshot_id(3051729675574597004)
1215 .with_timestamp_ms(1515100955770)
1216 .with_sequence_number(0)
1217 .with_manifest_list("s3://a/b/1.avro".to_string())
1218 .with_summary(Summary {
1219 operation: Operation::Append,
1220 other: HashMap::new(),
1221 })
1222 .build()
1223 .expect("Failed to create snapshot");
1224
1225 let snapshot2 = SnapshotBuilder::default()
1226 .with_snapshot_id(3055729675574597004)
1227 .with_parent_snapshot_id(3051729675574597004)
1228 .with_timestamp_ms(1555100955770)
1229 .with_sequence_number(1)
1230 .with_schema_id(1)
1231 .with_manifest_list("s3://a/b/2.avro".to_string())
1232 .with_summary(Summary {
1233 operation: Operation::Append,
1234 other: HashMap::new(),
1235 })
1236 .build()
1237 .expect("Failed to create snapshot");
1238
1239 let expected = TableMetadata {
1240 format_version: FormatVersion::V2,
1241 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
1242 location: "s3://bucket/test/location".to_string(),
1243 last_updated_ms: 1602638573590,
1244 last_column_id: 3,
1245 schemas: HashMap::from_iter(vec![(0, schema1), (1, schema2)]),
1246 current_schema_id: 1,
1247 partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1248 default_spec_id: 0,
1249 last_partition_id: 1000,
1250 default_sort_order_id: 3,
1251 sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
1252 snapshots: HashMap::from_iter(vec![
1253 (3051729675574597004, snapshot1),
1254 (3055729675574597004, snapshot2),
1255 ]),
1256 current_snapshot_id: Some(3055729675574597004),
1257 last_sequence_number: 34,
1258 properties: HashMap::new(),
1259 snapshot_log: vec![
1260 SnapshotLog {
1261 snapshot_id: 3051729675574597004,
1262 timestamp_ms: 1515100955770,
1263 },
1264 SnapshotLog {
1265 snapshot_id: 3055729675574597004,
1266 timestamp_ms: 1555100955770,
1267 },
1268 ],
1269 metadata_log: Vec::new(),
1270 refs: HashMap::from_iter(vec![(
1271 "main".to_string(),
1272 SnapshotReference {
1273 snapshot_id: 3055729675574597004,
1274 retention: SnapshotRetention::Branch {
1275 min_snapshots_to_keep: None,
1276 max_snapshot_age_ms: None,
1277 max_ref_age_ms: None,
1278 },
1279 },
1280 )]),
1281 };
1282
1283 check_table_metadata_serde(&metadata, expected);
1284 }
1285
1286 #[test]
1287 fn test_table_metadata_v2_file_valid_minimal() {
1288 let metadata =
1289 fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
1290
1291 let schema = SchemaBuilder::default()
1292 .with_schema_id(0)
1293 .with_struct_field(StructField {
1294 id: 1,
1295 name: "x".to_owned(),
1296 required: true,
1297 field_type: Type::Primitive(PrimitiveType::Long),
1298 doc: None,
1299 })
1300 .with_struct_field(StructField {
1301 id: 2,
1302 name: "y".to_owned(),
1303 required: true,
1304 field_type: Type::Primitive(PrimitiveType::Long),
1305 doc: Some("comment".to_owned()),
1306 })
1307 .with_struct_field(StructField {
1308 id: 3,
1309 name: "z".to_owned(),
1310 required: true,
1311 field_type: Type::Primitive(PrimitiveType::Long),
1312 doc: None,
1313 })
1314 .build()
1315 .unwrap();
1316
1317 let partition_spec = PartitionSpec::builder()
1318 .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1319 .build()
1320 .unwrap();
1321
1322 let sort_order = SortOrderBuilder::default()
1323 .with_order_id(3)
1324 .with_sort_field(SortField {
1325 source_id: 2,
1326 transform: Transform::Identity,
1327 direction: SortDirection::Ascending,
1328 null_order: NullOrder::First,
1329 })
1330 .with_sort_field(SortField {
1331 source_id: 3,
1332 transform: Transform::Bucket(4),
1333 direction: SortDirection::Descending,
1334 null_order: NullOrder::Last,
1335 })
1336 .build()
1337 .unwrap();
1338
1339 let expected = TableMetadata {
1340 format_version: FormatVersion::V2,
1341 table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
1342 location: "s3://bucket/test/location".to_string(),
1343 last_updated_ms: 1602638573590,
1344 last_column_id: 3,
1345 schemas: HashMap::from_iter(vec![(0, schema)]),
1346 current_schema_id: 0,
1347 partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1348 default_spec_id: 0,
1349 last_partition_id: 1000,
1350 default_sort_order_id: 3,
1351 sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
1352 snapshots: HashMap::default(),
1353 current_snapshot_id: None,
1354 last_sequence_number: 34,
1355 properties: HashMap::new(),
1356 snapshot_log: vec![],
1357 metadata_log: Vec::new(),
1358 refs: HashMap::new(),
1359 };
1360
1361 check_table_metadata_serde(&metadata, expected);
1362 }
1363
1364 #[test]
1365 fn test_table_metadata_v1_file_valid() {
1366 let metadata =
1367 fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
1368
1369 let schema = SchemaBuilder::default()
1370 .with_schema_id(0)
1371 .with_struct_field(StructField {
1372 id: 1,
1373 name: "x".to_owned(),
1374 required: true,
1375 field_type: Type::Primitive(PrimitiveType::Long),
1376 doc: None,
1377 })
1378 .with_struct_field(StructField {
1379 id: 2,
1380 name: "y".to_owned(),
1381 required: true,
1382 field_type: Type::Primitive(PrimitiveType::Long),
1383 doc: Some("comment".to_owned()),
1384 })
1385 .with_struct_field(StructField {
1386 id: 3,
1387 name: "z".to_owned(),
1388 required: true,
1389 field_type: Type::Primitive(PrimitiveType::Long),
1390 doc: None,
1391 })
1392 .build()
1393 .unwrap();
1394
1395 let partition_spec = PartitionSpec::builder()
1396 .with_partition_field(PartitionField::new(1, 1000, "x", Transform::Identity))
1397 .build()
1398 .unwrap();
1399
1400 let expected = TableMetadata {
1401 format_version: FormatVersion::V1,
1402 table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
1403 location: "s3://bucket/test/location".to_string(),
1404 last_updated_ms: 1602638573874,
1405 last_column_id: 3,
1406 schemas: HashMap::from_iter(vec![(0, schema)]),
1407 current_schema_id: 0,
1408 partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
1409 default_spec_id: 0,
1410 last_partition_id: 0,
1411 default_sort_order_id: 0,
1412 sort_orders: HashMap::new(),
1413 snapshots: HashMap::new(),
1414 current_snapshot_id: None,
1415 last_sequence_number: 0,
1416 properties: HashMap::new(),
1417 snapshot_log: vec![],
1418 metadata_log: Vec::new(),
1419 refs: HashMap::from_iter(vec![(
1420 "main".to_string(),
1421 SnapshotReference {
1422 snapshot_id: -1,
1423 retention: SnapshotRetention::Branch {
1424 min_snapshots_to_keep: None,
1425 max_snapshot_age_ms: None,
1426 max_ref_age_ms: None,
1427 },
1428 },
1429 )]),
1430 };
1431
1432 check_table_metadata_serde(&metadata, expected);
1433 }
1434
1435 #[test]
1436 fn test_table_metadata_v2_missing_sort_order() {
1437 let metadata =
1438 fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
1439 .unwrap();
1440
1441 let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
1442
1443 assert_eq!(
1444 desered.unwrap_err().to_string(),
1445 "data did not match any variant of untagged enum TableMetadataEnum"
1446 )
1447 }
1448}