1use std::collections::{HashMap, HashSet};
2use std::fmt::{self, Display};
3use std::str::FromStr;
4
5use delta_kernel::schema::{DataType, StructField};
6use delta_kernel::table_features::TableFeature;
7use serde::{Deserialize, Serialize};
8
9use crate::TableProperty;
10use crate::kernel::{DeltaResult, error::Error};
11use crate::kernel::{StructType, StructTypeExt};
12
13pub use delta_kernel::actions::{Metadata, Protocol};
14
15pub fn new_metadata(
22 schema: &StructType,
23 partition_columns: impl IntoIterator<Item = impl ToString>,
24 configuration: impl IntoIterator<Item = (impl ToString, impl ToString)>,
25) -> DeltaResult<Metadata> {
26 let value = serde_json::json!({
28 "id": uuid::Uuid::new_v4().to_string(),
29 "name": None::<String>,
30 "description": None::<String>,
31 "format": { "provider": "parquet", "options": {} },
32 "schemaString": serde_json::to_string(schema)?,
33 "partitionColumns": partition_columns.into_iter().map(|c| c.to_string()).collect::<Vec<_>>(),
34 "configuration": configuration.into_iter().map(|(k, v)| (k.to_string(), v.to_string())).collect::<HashMap<_, _>>(),
35 "createdTime": chrono::Utc::now().timestamp_millis(),
36 });
37 Ok(serde_json::from_value(value)?)
38}
39
40pub trait MetadataExt {
46 fn with_table_id(self, table_id: String) -> DeltaResult<Metadata>;
47
48 fn with_name(self, name: String) -> DeltaResult<Metadata>;
49
50 fn with_description(self, description: String) -> DeltaResult<Metadata>;
51
52 fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata>;
53
54 fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata>;
55
56 fn remove_config_key(self, key: &str) -> DeltaResult<Metadata>;
57}
58
59impl MetadataExt for Metadata {
60 fn with_table_id(self, table_id: String) -> DeltaResult<Metadata> {
61 let value = serde_json::json!({
62 "id": table_id,
63 "name": self.name(),
64 "description": self.description(),
65 "format": { "provider": "parquet", "options": {} },
66 "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
67 "partitionColumns": self.partition_columns(),
68 "configuration": self.configuration(),
69 "createdTime": self.created_time(),
70 });
71 Ok(serde_json::from_value(value)?)
72 }
73
74 fn with_name(self, name: String) -> DeltaResult<Metadata> {
75 let value = serde_json::json!({
76 "id": self.id(),
77 "name": name,
78 "description": self.description(),
79 "format": { "provider": "parquet", "options": {} },
80 "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
81 "partitionColumns": self.partition_columns(),
82 "configuration": self.configuration(),
83 "createdTime": self.created_time(),
84 });
85 Ok(serde_json::from_value(value)?)
86 }
87
88 fn with_description(self, description: String) -> DeltaResult<Metadata> {
89 let value = serde_json::json!({
90 "id": self.id(),
91 "name": self.name(),
92 "description": description,
93 "format": { "provider": "parquet", "options": {} },
94 "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
95 "partitionColumns": self.partition_columns(),
96 "configuration": self.configuration(),
97 "createdTime": self.created_time(),
98 });
99 Ok(serde_json::from_value(value)?)
100 }
101
102 fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata> {
103 let value = serde_json::json!({
104 "id": self.id(),
105 "name": self.name(),
106 "description": self.description(),
107 "format": { "provider": "parquet", "options": {} },
108 "schemaString": serde_json::to_string(schema)?,
109 "partitionColumns": self.partition_columns(),
110 "configuration": self.configuration(),
111 "createdTime": self.created_time(),
112 });
113 Ok(serde_json::from_value(value)?)
114 }
115
116 fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata> {
117 let mut config = self.configuration().clone();
118 config.insert(key, value);
119 let value = serde_json::json!({
120 "id": self.id(),
121 "name": self.name(),
122 "description": self.description(),
123 "format": { "provider": "parquet", "options": {} },
124 "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
125 "partitionColumns": self.partition_columns(),
126 "configuration": config,
127 "createdTime": self.created_time(),
128 });
129 Ok(serde_json::from_value(value)?)
130 }
131
132 fn remove_config_key(self, key: &str) -> DeltaResult<Metadata> {
133 let mut config = self.configuration().clone();
134 config.remove(key);
135 let value = serde_json::json!({
136 "id": self.id(),
137 "name": self.name(),
138 "description": self.description(),
139 "format": { "provider": "parquet", "options": {} },
140 "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
141 "partitionColumns": self.partition_columns(),
142 "configuration": config,
143 "createdTime": self.created_time(),
144 });
145 Ok(serde_json::from_value(value)?)
146 }
147}
148
149pub fn contains_timestampntz<'a>(mut fields: impl Iterator<Item = &'a StructField>) -> bool {
151 fn _check_type(dtype: &DataType) -> bool {
152 match dtype {
153 &DataType::TIMESTAMP_NTZ => true,
154 DataType::Array(inner) => _check_type(inner.element_type()),
155 DataType::Struct(inner) => inner.fields().any(|f| _check_type(f.data_type())),
156 _ => false,
157 }
158 }
159 fields.any(|f| _check_type(f.data_type()))
160}
161
162pub(crate) trait ProtocolExt {
167 fn reader_features_set(&self) -> Option<HashSet<TableFeature>>;
168 fn writer_features_set(&self) -> Option<HashSet<TableFeature>>;
169 fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol;
170 fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol;
171 fn move_table_properties_into_features(
172 self,
173 configuration: &HashMap<String, String>,
174 ) -> Protocol;
175 fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol>;
176 fn apply_properties_to_protocol(
177 self,
178 new_properties: &HashMap<String, String>,
179 raise_if_not_exists: bool,
180 ) -> DeltaResult<Protocol>;
181}
182
183impl ProtocolExt for Protocol {
184 fn reader_features_set(&self) -> Option<HashSet<TableFeature>> {
185 self.reader_features()
186 .map(|features| features.iter().cloned().collect())
187 }
188
189 fn writer_features_set(&self) -> Option<HashSet<TableFeature>> {
190 self.writer_features()
191 .map(|features| features.iter().cloned().collect())
192 }
193
194 fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol {
195 let mut inner = ProtocolInner::from_kernel(&self);
196 inner = inner.append_reader_features(reader_features.iter().cloned());
197 inner.as_kernel()
198 }
199
200 fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol {
201 let mut inner = ProtocolInner::from_kernel(&self);
202 inner = inner.append_writer_features(writer_features.iter().cloned());
203 inner.as_kernel()
204 }
205
206 fn move_table_properties_into_features(
207 self,
208 configuration: &HashMap<String, String>,
209 ) -> Protocol {
210 let mut inner = ProtocolInner::from_kernel(&self);
211 inner = inner.move_table_properties_into_features(configuration);
212 inner.as_kernel()
213 }
214
215 fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol> {
216 let mut inner = ProtocolInner::from_kernel(&self);
217 inner = inner.apply_column_metadata_to_protocol(schema)?;
218 Ok(inner.as_kernel())
219 }
220
221 fn apply_properties_to_protocol(
222 self,
223 new_properties: &HashMap<String, String>,
224 raise_if_not_exists: bool,
225 ) -> DeltaResult<Protocol> {
226 let mut inner = ProtocolInner::from_kernel(&self);
227 inner = inner.apply_properties_to_protocol(new_properties, raise_if_not_exists)?;
228 Ok(inner.as_kernel())
229 }
230}
231
232#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
233#[serde(rename_all = "camelCase")]
234pub(crate) struct ProtocolInner {
242 pub min_reader_version: i32,
245 pub min_writer_version: i32,
248 #[serde(skip_serializing_if = "Option::is_none")]
251 pub reader_features: Option<HashSet<TableFeature>>,
252 #[serde(skip_serializing_if = "Option::is_none")]
255 pub writer_features: Option<HashSet<TableFeature>>,
256}
257
258impl Default for ProtocolInner {
259 fn default() -> Self {
260 Self {
261 min_reader_version: 1,
262 min_writer_version: 2,
263 reader_features: None,
264 writer_features: None,
265 }
266 }
267}
268
269impl ProtocolInner {
270 #[cfg(test)]
272 pub(crate) fn new(min_reader_version: i32, min_writer_version: i32) -> Self {
273 Self {
274 min_reader_version,
275 min_writer_version,
276 reader_features: None,
277 writer_features: None,
278 }
279 }
280
281 pub(crate) fn from_kernel(value: &Protocol) -> ProtocolInner {
282 serde_json::from_value(serde_json::to_value(value).unwrap()).unwrap()
284 }
285
286 pub(crate) fn as_kernel(&self) -> Protocol {
287 serde_json::from_value(serde_json::to_value(self).unwrap()).unwrap()
289 }
290
291 pub fn append_reader_features(
293 mut self,
294 reader_features: impl IntoIterator<Item = impl Into<TableFeature>>,
295 ) -> Self {
296 let all_reader_features = reader_features
297 .into_iter()
298 .map(Into::into)
299 .collect::<HashSet<_>>();
300 if !all_reader_features.is_empty() {
301 self.min_reader_version = 3;
302 match self.reader_features {
303 Some(mut features) => {
304 features.extend(all_reader_features);
305 self.reader_features = Some(features);
306 }
307 None => self.reader_features = Some(all_reader_features),
308 };
309 }
310 self
311 }
312
313 pub fn append_writer_features(
315 mut self,
316 writer_features: impl IntoIterator<Item = impl Into<TableFeature>>,
317 ) -> Self {
318 let all_writer_features = writer_features
319 .into_iter()
320 .map(|c| c.into())
321 .collect::<HashSet<_>>();
322 if !all_writer_features.is_empty() {
323 self.min_writer_version = 7;
324
325 match self.writer_features {
326 Some(mut features) => {
327 features.extend(all_writer_features);
328 self.writer_features = Some(features);
329 }
330 None => self.writer_features = Some(all_writer_features),
331 };
332 }
333 self
334 }
335
336 pub fn move_table_properties_into_features(
339 mut self,
340 configuration: &HashMap<String, String>,
341 ) -> Self {
342 fn parse_bool(value: &str) -> bool {
343 value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
344 }
345
346 if self.min_writer_version >= 7 {
347 let mut converted_writer_features = configuration
349 .iter()
350 .filter(|(_, value)| value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
351 .filter_map(|(key, value)| match key.as_str() {
352 "delta.enableChangeDataFeed" if parse_bool(value) => {
353 Some(TableFeature::ChangeDataFeed)
354 }
355 "delta.appendOnly" if parse_bool(value) => Some(TableFeature::AppendOnly),
356 "delta.enableDeletionVectors" if parse_bool(value) => {
357 Some(TableFeature::DeletionVectors)
358 }
359 "delta.enableRowTracking" if parse_bool(value) => {
360 Some(TableFeature::RowTracking)
361 }
362 "delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
363 _ => None,
364 })
365 .collect::<HashSet<TableFeature>>();
366
367 if configuration
368 .keys()
369 .any(|v| v.starts_with("delta.constraints."))
370 {
371 converted_writer_features.insert(TableFeature::CheckConstraints);
372 }
373
374 match self.writer_features {
375 Some(mut features) => {
376 features.extend(converted_writer_features);
377 self.writer_features = Some(features);
378 }
379 None => self.writer_features = Some(converted_writer_features),
380 }
381 }
382 if self.min_reader_version >= 3 {
383 let converted_reader_features = configuration
384 .iter()
385 .filter_map(|(key, value)| match key.as_str() {
386 "delta.enableDeletionVectors" if parse_bool(value) => {
387 Some(TableFeature::DeletionVectors)
388 }
389 "delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
390 _ => None,
391 })
392 .collect::<HashSet<TableFeature>>();
393 match self.reader_features {
394 Some(mut features) => {
395 features.extend(converted_reader_features);
396 self.reader_features = Some(features);
397 }
398 None => self.reader_features = Some(converted_reader_features),
399 }
400 }
401 self
402 }
403
404 pub fn apply_column_metadata_to_protocol(mut self, schema: &StructType) -> DeltaResult<Self> {
407 let generated_cols = schema.get_generated_columns()?;
408 let invariants = schema.get_invariants()?;
409 let contains_timestamp_ntz = self.contains_timestampntz(schema.fields());
410
411 if contains_timestamp_ntz {
412 self = self.enable_timestamp_ntz()
413 }
414
415 if !generated_cols.is_empty() {
416 self = self.enable_generated_columns()
417 }
418
419 if !invariants.is_empty() {
420 self = self.enable_invariants()
421 }
422
423 Ok(self)
424 }
425
426 pub fn apply_properties_to_protocol(
429 mut self,
430 new_properties: &HashMap<String, String>,
431 raise_if_not_exists: bool,
432 ) -> DeltaResult<Self> {
433 let mut parsed_properties: HashMap<TableProperty, String> = HashMap::new();
434
435 for (key, value) in new_properties {
436 if let Ok(parsed_key) = key.parse::<TableProperty>() {
437 parsed_properties.insert(parsed_key, value.to_string());
438 } else if raise_if_not_exists {
439 return Err(Error::Generic(format!(
440 "Error parsing property '{key}':'{value}'",
441 )));
442 }
443 }
444
445 if let Some(min_reader_version) = parsed_properties.get(&TableProperty::MinReaderVersion) {
447 let new_min_reader_version = min_reader_version.parse::<i32>();
448 match new_min_reader_version {
449 Ok(version) => match version {
450 1..=3 => {
451 if version > self.min_reader_version {
452 self.min_reader_version = version
453 }
454 }
455 _ => {
456 return Err(Error::Generic(format!(
457 "delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
458 )));
459 }
460 },
461 Err(_) => {
462 return Err(Error::Generic(format!(
463 "delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
464 )));
465 }
466 }
467 }
468
469 if let Some(min_writer_version) = parsed_properties.get(&TableProperty::MinWriterVersion) {
471 let new_min_writer_version = min_writer_version.parse::<i32>();
472 match new_min_writer_version {
473 Ok(version) => match version {
474 2..=7 => {
475 if version > self.min_writer_version {
476 self.min_writer_version = version
477 }
478 }
479 _ => {
480 return Err(Error::Generic(format!(
481 "delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
482 )));
483 }
484 },
485 Err(_) => {
486 return Err(Error::Generic(format!(
487 "delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
488 )));
489 }
490 }
491 }
492
493 if let Some(enable_cdf) = parsed_properties.get(&TableProperty::EnableChangeDataFeed) {
495 let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
496 match if_enable_cdf {
497 Ok(true) => {
498 if self.min_writer_version >= 7 {
499 match self.writer_features {
500 Some(mut features) => {
501 features.insert(TableFeature::ChangeDataFeed);
502 self.writer_features = Some(features);
503 }
504 None => {
505 self.writer_features =
506 Some(HashSet::from([TableFeature::ChangeDataFeed]))
507 }
508 }
509 } else if self.min_writer_version <= 3 {
510 self.min_writer_version = 4
511 }
512 }
513 Ok(false) => {}
514 _ => {
515 return Err(Error::Generic(format!(
516 "delta.enableChangeDataFeed = '{enable_cdf}' is invalid, valid values are ['true']"
517 )));
518 }
519 }
520 }
521
522 if let Some(enable_dv) = parsed_properties.get(&TableProperty::EnableDeletionVectors) {
523 let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
524 match if_enable_dv {
525 Ok(true) => {
526 let writer_features = match self.writer_features {
527 Some(mut features) => {
528 features.insert(TableFeature::DeletionVectors);
529 features
530 }
531 None => HashSet::from([TableFeature::DeletionVectors]),
532 };
533 let reader_features = match self.reader_features {
534 Some(mut features) => {
535 features.insert(TableFeature::DeletionVectors);
536 features
537 }
538 None => HashSet::from([TableFeature::DeletionVectors]),
539 };
540 self.min_reader_version = 3;
541 self.min_writer_version = 7;
542 self.writer_features = Some(writer_features);
543 self.reader_features = Some(reader_features);
544 }
545 Ok(false) => {}
546 _ => {
547 return Err(Error::Generic(format!(
548 "delta.enableDeletionVectors = '{enable_dv}' is invalid, valid values are ['true']"
549 )));
550 }
551 }
552 }
553 Ok(self)
554 }
555
556 fn contains_timestampntz<'a>(&self, fields: impl Iterator<Item = &'a StructField>) -> bool {
558 contains_timestampntz(fields)
559 }
560
561 fn enable_timestamp_ntz(mut self) -> Self {
563 self = self.append_reader_features([TableFeature::TimestampWithoutTimezone]);
564 self = self.append_writer_features([TableFeature::TimestampWithoutTimezone]);
565 self
566 }
567
568 fn enable_generated_columns(mut self) -> Self {
570 if self.min_writer_version < 4 {
571 self.min_writer_version = 4;
572 }
573 if self.min_writer_version >= 7 {
574 self = self.append_writer_features([TableFeature::GeneratedColumns]);
575 }
576 self
577 }
578
579 fn enable_invariants(mut self) -> Self {
581 if self.min_writer_version >= 7 {
582 self = self.append_writer_features([TableFeature::Invariants]);
583 }
584 self
585 }
586}
587
588#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
590#[serde(rename_all = "camelCase")]
591pub enum TableFeatures {
592 ColumnMapping,
594 DeletionVectors,
596 #[serde(rename = "timestampNtz")]
598 TimestampWithoutTimezone,
599 V2Checkpoint,
601 AppendOnly,
603 Invariants,
605 CheckConstraints,
607 ChangeDataFeed,
609 GeneratedColumns,
611 IdentityColumns,
613 RowTracking,
615 DomainMetadata,
617 IcebergCompatV1,
619 MaterializePartitionColumns,
620}
621
622impl FromStr for TableFeatures {
623 type Err = ();
624
625 fn from_str(value: &str) -> Result<Self, Self::Err> {
626 match value {
627 "columnMapping" => Ok(TableFeatures::ColumnMapping),
628 "deletionVectors" => Ok(TableFeatures::DeletionVectors),
629 "timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
630 "v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
631 "appendOnly" => Ok(TableFeatures::AppendOnly),
632 "invariants" => Ok(TableFeatures::Invariants),
633 "checkConstraints" => Ok(TableFeatures::CheckConstraints),
634 "changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
635 "generatedColumns" => Ok(TableFeatures::GeneratedColumns),
636 "identityColumns" => Ok(TableFeatures::IdentityColumns),
637 "rowTracking" => Ok(TableFeatures::RowTracking),
638 "domainMetadata" => Ok(TableFeatures::DomainMetadata),
639 "icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
640 "materializePartitionColumns" => Ok(TableFeatures::MaterializePartitionColumns),
641 _ => Err(()),
642 }
643 }
644}
645
646impl AsRef<str> for TableFeatures {
647 fn as_ref(&self) -> &str {
648 match self {
649 TableFeatures::ColumnMapping => "columnMapping",
650 TableFeatures::DeletionVectors => "deletionVectors",
651 TableFeatures::TimestampWithoutTimezone => "timestampNtz",
652 TableFeatures::V2Checkpoint => "v2Checkpoint",
653 TableFeatures::AppendOnly => "appendOnly",
654 TableFeatures::Invariants => "invariants",
655 TableFeatures::CheckConstraints => "checkConstraints",
656 TableFeatures::ChangeDataFeed => "changeDataFeed",
657 TableFeatures::GeneratedColumns => "generatedColumns",
658 TableFeatures::IdentityColumns => "identityColumns",
659 TableFeatures::RowTracking => "rowTracking",
660 TableFeatures::DomainMetadata => "domainMetadata",
661 TableFeatures::IcebergCompatV1 => "icebergCompatV1",
662 TableFeatures::MaterializePartitionColumns => "materializePartitionColumns",
663 }
664 }
665}
666
667impl fmt::Display for TableFeatures {
668 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669 write!(f, "{}", self.as_ref())
670 }
671}
672
673impl TryFrom<&TableFeatures> for TableFeature {
674 type Error = strum::ParseError;
675
676 fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
677 TableFeature::try_from(value.as_ref())
678 }
679}
680
681impl TableFeatures {
682 pub fn to_reader_writer_features(&self) -> (Option<TableFeature>, Option<TableFeature>) {
684 let feature = TableFeature::try_from(self).ok();
685 match feature {
686 Some(feature) => {
687 match feature {
690 TableFeature::AppendOnly
691 | TableFeature::Invariants
692 | TableFeature::CheckConstraints
693 | TableFeature::ChangeDataFeed
694 | TableFeature::GeneratedColumns
695 | TableFeature::IdentityColumns
696 | TableFeature::InCommitTimestamp
697 | TableFeature::RowTracking
698 | TableFeature::DomainMetadata
699 | TableFeature::IcebergCompatV1
700 | TableFeature::IcebergCompatV2
701 | TableFeature::ClusteredTable
702 | TableFeature::MaterializePartitionColumns => (None, Some(feature)),
703
704 TableFeature::CatalogManaged
706 | TableFeature::CatalogOwnedPreview
707 | TableFeature::ColumnMapping
708 | TableFeature::DeletionVectors
709 | TableFeature::TimestampWithoutTimezone
710 | TableFeature::TypeWidening
711 | TableFeature::TypeWideningPreview
712 | TableFeature::V2Checkpoint
713 | TableFeature::VacuumProtocolCheck
714 | TableFeature::VariantType
715 | TableFeature::VariantTypePreview
716 | TableFeature::VariantShreddingPreview => {
717 (Some(feature.clone()), Some(feature))
718 }
719
720 TableFeature::Unknown(_) => (None, None),
722 }
723 }
724 None => (None, None),
725 }
726 }
727}
728
729#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, Default)]
731pub enum StorageType {
732 #[serde(rename = "u")]
734 #[default]
735 UuidRelativePath,
736 #[serde(rename = "i")]
738 Inline,
739 #[serde(rename = "p")]
741 AbsolutePath,
742}
743
744impl FromStr for StorageType {
745 type Err = Error;
746
747 fn from_str(s: &str) -> Result<Self, Self::Err> {
748 match s {
749 "u" => Ok(Self::UuidRelativePath),
750 "i" => Ok(Self::Inline),
751 "p" => Ok(Self::AbsolutePath),
752 _ => Err(Error::DeletionVector(format!(
753 "Unknown storage format: '{s}'."
754 ))),
755 }
756 }
757}
758
759impl AsRef<str> for StorageType {
760 fn as_ref(&self) -> &str {
761 match self {
762 Self::UuidRelativePath => "u",
763 Self::Inline => "i",
764 Self::AbsolutePath => "p",
765 }
766 }
767}
768
769impl Display for StorageType {
770 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771 write!(f, "{}", self.as_ref())
772 }
773}
774
775#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
776#[serde(rename_all = "camelCase")]
777pub struct DeletionVectorDescriptor {
779 pub storage_type: StorageType,
781
782 pub path_or_inline_dv: String,
797
798 #[serde(skip_serializing_if = "Option::is_none")]
801 pub offset: Option<i32>,
802
803 pub size_in_bytes: i32,
805
806 pub cardinality: i64,
808}
809
810#[derive(Serialize, Deserialize, Debug, Clone, Default)]
811#[serde(rename_all = "camelCase")]
812pub struct Add {
814 #[serde(with = "serde_path")]
820 pub path: String,
821
822 pub partition_values: HashMap<String, Option<String>>,
824
825 pub size: i64,
827
828 pub modification_time: i64,
830
831 pub data_change: bool,
834
835 pub stats: Option<String>,
839
840 pub tags: Option<HashMap<String, Option<String>>>,
842
843 #[serde(skip_serializing_if = "Option::is_none")]
844 pub deletion_vector: Option<DeletionVectorDescriptor>,
846
847 pub base_row_id: Option<i64>,
851
852 pub default_row_commit_version: Option<i64>,
854
855 pub clustering_provider: Option<String>,
857}
858
859#[derive(Serialize, Deserialize, Debug, Clone, Eq, Default)]
861#[serde(rename_all = "camelCase")]
862pub struct Remove {
863 #[serde(with = "serde_path")]
869 pub path: String,
870
871 pub data_change: bool,
874
875 #[serde(skip_serializing_if = "Option::is_none")]
877 pub deletion_timestamp: Option<i64>,
878
879 #[serde(skip_serializing_if = "Option::is_none")]
881 pub extended_file_metadata: Option<bool>,
882
883 #[serde(skip_serializing_if = "Option::is_none")]
885 pub partition_values: Option<HashMap<String, Option<String>>>,
886
887 #[serde(skip_serializing_if = "Option::is_none")]
889 pub size: Option<i64>,
890
891 #[serde(skip_serializing_if = "Option::is_none")]
893 pub tags: Option<HashMap<String, Option<String>>>,
894
895 #[serde(skip_serializing_if = "Option::is_none")]
897 pub deletion_vector: Option<DeletionVectorDescriptor>,
898
899 #[serde(skip_serializing_if = "Option::is_none")]
903 pub base_row_id: Option<i64>,
904
905 #[serde(skip_serializing_if = "Option::is_none")]
907 pub default_row_commit_version: Option<i64>,
908}
909
910#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
912#[serde(rename_all = "camelCase")]
913pub struct AddCDCFile {
914 #[serde(with = "serde_path")]
917 pub path: String,
918
919 pub size: i64,
921
922 pub partition_values: HashMap<String, Option<String>>,
924
925 pub data_change: bool,
927
928 #[serde(skip_serializing_if = "Option::is_none")]
930 pub tags: Option<HashMap<String, Option<String>>>,
931}
932
933#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq)]
936#[serde(rename_all = "camelCase")]
937pub struct Transaction {
938 pub app_id: String,
940
941 pub version: i64,
943
944 #[serde(skip_serializing_if = "Option::is_none")]
946 pub last_updated: Option<i64>,
947}
948
949impl Transaction {
950 pub fn new(app_id: impl ToString, version: i64) -> Self {
952 Self::new_with_last_update(app_id, version, None)
953 }
954
955 pub fn new_with_last_update(
957 app_id: impl ToString,
958 version: i64,
959 last_updated: Option<i64>,
960 ) -> Self {
961 Transaction {
962 app_id: app_id.to_string(),
963 version,
964 last_updated,
965 }
966 }
967}
968
969#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
973#[serde(rename_all = "camelCase")]
974pub struct CommitInfo {
975 #[serde(skip_serializing_if = "Option::is_none")]
977 pub timestamp: Option<i64>,
978
979 #[serde(skip_serializing_if = "Option::is_none")]
981 pub user_id: Option<String>,
982
983 #[serde(skip_serializing_if = "Option::is_none")]
985 pub user_name: Option<String>,
986
987 #[serde(skip_serializing_if = "Option::is_none")]
989 pub operation: Option<String>,
990
991 #[serde(skip_serializing_if = "Option::is_none")]
993 pub operation_parameters: Option<HashMap<String, serde_json::Value>>,
994
995 #[serde(skip_serializing_if = "Option::is_none")]
997 pub read_version: Option<i64>,
998
999 #[serde(skip_serializing_if = "Option::is_none")]
1001 pub isolation_level: Option<IsolationLevel>,
1002
1003 #[serde(skip_serializing_if = "Option::is_none")]
1005 pub is_blind_append: Option<bool>,
1006
1007 #[serde(skip_serializing_if = "Option::is_none")]
1009 pub engine_info: Option<String>,
1010
1011 #[serde(flatten, default)]
1013 pub info: HashMap<String, serde_json::Value>,
1014
1015 #[serde(skip_serializing_if = "Option::is_none")]
1017 pub user_metadata: Option<String>,
1018}
1019
1020#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1022#[serde(rename_all = "camelCase")]
1023pub struct DomainMetadata {
1024 pub domain: String,
1026
1027 pub configuration: String,
1029
1030 pub removed: bool,
1032}
1033
1034#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1035pub struct CheckpointMetadata {
1037 pub flavor: String,
1039
1040 #[serde(skip_serializing_if = "Option::is_none")]
1042 pub tags: Option<HashMap<String, Option<String>>>,
1043}
1044
1045#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1048#[serde(rename_all = "camelCase")]
1049pub struct Sidecar {
1050 pub file_name: String,
1053
1054 pub size_in_bytes: i64,
1056
1057 pub modification_time: i64,
1059
1060 #[serde(rename = "type")]
1063 pub sidecar_type: String,
1064
1065 #[serde(skip_serializing_if = "Option::is_none")]
1067 pub tags: Option<HashMap<String, Option<String>>>,
1068}
1069
1070#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
1071#[derive(Default)]
1073pub enum IsolationLevel {
1074 #[default]
1080 Serializable,
1081
1082 WriteSerializable,
1087
1088 SnapshotIsolation,
1093}
1094
1095impl AsRef<str> for IsolationLevel {
1099 fn as_ref(&self) -> &str {
1100 match self {
1101 Self::Serializable => "Serializable",
1102 Self::WriteSerializable => "WriteSerializable",
1103 Self::SnapshotIsolation => "SnapshotIsolation",
1104 }
1105 }
1106}
1107
1108impl FromStr for IsolationLevel {
1109 type Err = Error;
1110
1111 fn from_str(s: &str) -> Result<Self, Self::Err> {
1112 match s.to_ascii_lowercase().as_str() {
1113 "serializable" => Ok(Self::Serializable),
1114 "writeserializable" | "write_serializable" => Ok(Self::WriteSerializable),
1115 "snapshotisolation" | "snapshot_isolation" => Ok(Self::SnapshotIsolation),
1116 _ => Err(Error::Generic("Invalid string for IsolationLevel".into())),
1117 }
1118 }
1119}
1120
1121pub(crate) mod serde_path {
1122 use std::str::Utf8Error;
1123
1124 use percent_encoding::{AsciiSet, CONTROLS, percent_decode_str, percent_encode};
1125 use serde::{self, Deserialize, Deserializer, Serialize, Serializer};
1126
1127 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1128 where
1129 D: Deserializer<'de>,
1130 {
1131 let s = String::deserialize(deserializer)?;
1132 decode_path(&s).map_err(serde::de::Error::custom)
1133 }
1134
1135 pub fn serialize<S>(value: &str, serializer: S) -> Result<S::Ok, S::Error>
1136 where
1137 S: Serializer,
1138 {
1139 let encoded = encode_path(value);
1140 String::serialize(&encoded, serializer)
1141 }
1142
1143 pub const _DELIMITER: &str = "/";
1144 pub const _DELIMITER_BYTE: u8 = _DELIMITER.as_bytes()[0];
1146
1147 const INVALID: &AsciiSet = &CONTROLS
1149 .add(b'\\')
1154 .add(b'{')
1155 .add(b'^')
1156 .add(b'}')
1157 .add(b'%')
1158 .add(b'`')
1159 .add(b']')
1160 .add(b'"')
1161 .add(b'>')
1162 .add(b'[')
1163 .add(b'<')
1165 .add(b'#')
1166 .add(b'|')
1167 .add(b'\r')
1170 .add(b'\n')
1171 .add(b'*')
1172 .add(b'?');
1173
1174 fn encode_path(path: &str) -> String {
1175 percent_encode(path.as_bytes(), INVALID).to_string()
1176 }
1177
1178 pub fn decode_path(path: &str) -> Result<String, Utf8Error> {
1179 Ok(percent_decode_str(path).decode_utf8()?.to_string())
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::*;
1186 use crate::kernel::PrimitiveType;
1187
1188 #[test]
1189 fn test_primitive() {
1190 let types: PrimitiveType = serde_json::from_str("\"string\"").unwrap();
1191 println!("{types:?}");
1192 }
1193
1194 #[test]
1195 fn test_deserialize_protocol() {
1196 let raw = serde_json::json!(
1198 {
1199 "minReaderVersion": 3,
1200 "minWriterVersion": 7,
1201 "readerFeatures": ["catalogOwned"],
1202 "writerFeatures": ["catalogOwned", "invariants", "appendOnly"]
1203 }
1204 );
1205 let protocol: Protocol = serde_json::from_value(raw).unwrap();
1206 assert_eq!(protocol.min_reader_version(), 3);
1207 assert_eq!(protocol.min_writer_version(), 7);
1208 assert_eq!(
1209 protocol.reader_features(),
1210 Some(vec![TableFeature::Unknown("catalogOwned".to_owned())].as_slice())
1211 );
1212 assert_eq!(
1213 protocol.writer_features(),
1214 Some(
1215 vec![
1216 TableFeature::Unknown("catalogOwned".to_owned()),
1217 TableFeature::Invariants,
1218 TableFeature::AppendOnly
1219 ]
1220 .as_slice()
1221 )
1222 );
1223 }
1224
1225 }