Skip to main content

deltalake_core/kernel/models/
actions.rs

1use std::collections::{HashMap, HashSet};
2use std::fmt::{self, Display};
3use std::str::FromStr;
4
5use delta_kernel::schema::{DataType, StructField};
6use delta_kernel::table_features::TableFeature;
7use serde::{Deserialize, Serialize};
8
9use crate::TableProperty;
10use crate::kernel::{DeltaResult, error::Error};
11use crate::kernel::{StructType, StructTypeExt};
12
13pub use delta_kernel::actions::{Metadata, Protocol};
14
15/// Please don't use, this API will be leaving shortly!
16///
17/// Since the adoption of delta-kernel-rs we lost the direct ability to create [Metadata] actions
18/// which is required for some use-cases.
19///
20/// Upstream tracked here: <https://github.com/delta-io/delta-kernel-rs/issues/1055>
21pub fn new_metadata(
22    schema: &StructType,
23    partition_columns: impl IntoIterator<Item = impl ToString>,
24    configuration: impl IntoIterator<Item = (impl ToString, impl ToString)>,
25) -> DeltaResult<Metadata> {
26    // this ugliness is a stop-gap until we resolve: https://github.com/delta-io/delta-kernel-rs/issues/1055
27    let value = serde_json::json!({
28        "id": uuid::Uuid::new_v4().to_string(),
29        "name": None::<String>,
30        "description": None::<String>,
31        "format": { "provider": "parquet", "options": {} },
32        "schemaString": serde_json::to_string(schema)?,
33        "partitionColumns": partition_columns.into_iter().map(|c| c.to_string()).collect::<Vec<_>>(),
34        "configuration": configuration.into_iter().map(|(k, v)| (k.to_string(), v.to_string())).collect::<HashMap<_, _>>(),
35        "createdTime": chrono::Utc::now().timestamp_millis(),
36    });
37    Ok(serde_json::from_value(value)?)
38}
39
40/// Extension trait for Metadata action
41///
42/// This trait is a stop-gap to adopt the Metadata action from delta-kernel-rs
43/// while the update / mutation APIs are being implemented. It allows us to implement
44/// additional APIs on the Metadata action and hide specifics of how we do the updates.
45pub trait MetadataExt {
46    fn with_table_id(self, table_id: String) -> DeltaResult<Metadata>;
47
48    fn with_name(self, name: String) -> DeltaResult<Metadata>;
49
50    fn with_description(self, description: String) -> DeltaResult<Metadata>;
51
52    fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata>;
53
54    fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata>;
55
56    fn remove_config_key(self, key: &str) -> DeltaResult<Metadata>;
57}
58
59impl MetadataExt for Metadata {
60    fn with_table_id(self, table_id: String) -> DeltaResult<Metadata> {
61        let value = serde_json::json!({
62            "id": table_id,
63            "name": self.name(),
64            "description": self.description(),
65            "format": { "provider": "parquet", "options": {} },
66            "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
67            "partitionColumns": self.partition_columns(),
68            "configuration": self.configuration(),
69            "createdTime": self.created_time(),
70        });
71        Ok(serde_json::from_value(value)?)
72    }
73
74    fn with_name(self, name: String) -> DeltaResult<Metadata> {
75        let value = serde_json::json!({
76            "id": self.id(),
77            "name": name,
78            "description": self.description(),
79            "format": { "provider": "parquet", "options": {} },
80            "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
81            "partitionColumns": self.partition_columns(),
82            "configuration": self.configuration(),
83            "createdTime": self.created_time(),
84        });
85        Ok(serde_json::from_value(value)?)
86    }
87
88    fn with_description(self, description: String) -> DeltaResult<Metadata> {
89        let value = serde_json::json!({
90            "id": self.id(),
91            "name": self.name(),
92            "description": description,
93            "format": { "provider": "parquet", "options": {} },
94            "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
95            "partitionColumns": self.partition_columns(),
96            "configuration": self.configuration(),
97            "createdTime": self.created_time(),
98        });
99        Ok(serde_json::from_value(value)?)
100    }
101
102    fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata> {
103        let value = serde_json::json!({
104            "id": self.id(),
105            "name": self.name(),
106            "description": self.description(),
107            "format": { "provider": "parquet", "options": {} },
108            "schemaString": serde_json::to_string(schema)?,
109            "partitionColumns": self.partition_columns(),
110            "configuration": self.configuration(),
111            "createdTime": self.created_time(),
112        });
113        Ok(serde_json::from_value(value)?)
114    }
115
116    fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata> {
117        let mut config = self.configuration().clone();
118        config.insert(key, value);
119        let value = serde_json::json!({
120            "id": self.id(),
121            "name": self.name(),
122            "description": self.description(),
123            "format": { "provider": "parquet", "options": {} },
124            "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
125            "partitionColumns": self.partition_columns(),
126            "configuration": config,
127            "createdTime": self.created_time(),
128        });
129        Ok(serde_json::from_value(value)?)
130    }
131
132    fn remove_config_key(self, key: &str) -> DeltaResult<Metadata> {
133        let mut config = self.configuration().clone();
134        config.remove(key);
135        let value = serde_json::json!({
136            "id": self.id(),
137            "name": self.name(),
138            "description": self.description(),
139            "format": { "provider": "parquet", "options": {} },
140            "schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
141            "partitionColumns": self.partition_columns(),
142            "configuration": config,
143            "createdTime": self.created_time(),
144        });
145        Ok(serde_json::from_value(value)?)
146    }
147}
148
149/// checks if table contains timestamp_ntz in any field including nested fields.
150pub fn contains_timestampntz<'a>(mut fields: impl Iterator<Item = &'a StructField>) -> bool {
151    fn _check_type(dtype: &DataType) -> bool {
152        match dtype {
153            &DataType::TIMESTAMP_NTZ => true,
154            DataType::Array(inner) => _check_type(inner.element_type()),
155            DataType::Struct(inner) => inner.fields().any(|f| _check_type(f.data_type())),
156            _ => false,
157        }
158    }
159    fields.any(|f| _check_type(f.data_type()))
160}
161
162/// Extension trait for delta-kernel Protocol action.
163///
164/// Allows us to extend the Protocol struct with additional methods
165/// to update the protocol actions.
166pub(crate) trait ProtocolExt {
167    fn reader_features_set(&self) -> Option<HashSet<TableFeature>>;
168    fn writer_features_set(&self) -> Option<HashSet<TableFeature>>;
169    fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol;
170    fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol;
171    fn move_table_properties_into_features(
172        self,
173        configuration: &HashMap<String, String>,
174    ) -> Protocol;
175    fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol>;
176    fn apply_properties_to_protocol(
177        self,
178        new_properties: &HashMap<String, String>,
179        raise_if_not_exists: bool,
180    ) -> DeltaResult<Protocol>;
181}
182
183impl ProtocolExt for Protocol {
184    fn reader_features_set(&self) -> Option<HashSet<TableFeature>> {
185        self.reader_features()
186            .map(|features| features.iter().cloned().collect())
187    }
188
189    fn writer_features_set(&self) -> Option<HashSet<TableFeature>> {
190        self.writer_features()
191            .map(|features| features.iter().cloned().collect())
192    }
193
194    fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol {
195        let mut inner = ProtocolInner::from_kernel(&self);
196        inner = inner.append_reader_features(reader_features.iter().cloned());
197        inner.as_kernel()
198    }
199
200    fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol {
201        let mut inner = ProtocolInner::from_kernel(&self);
202        inner = inner.append_writer_features(writer_features.iter().cloned());
203        inner.as_kernel()
204    }
205
206    fn move_table_properties_into_features(
207        self,
208        configuration: &HashMap<String, String>,
209    ) -> Protocol {
210        let mut inner = ProtocolInner::from_kernel(&self);
211        inner = inner.move_table_properties_into_features(configuration);
212        inner.as_kernel()
213    }
214
215    fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol> {
216        let mut inner = ProtocolInner::from_kernel(&self);
217        inner = inner.apply_column_metadata_to_protocol(schema)?;
218        Ok(inner.as_kernel())
219    }
220
221    fn apply_properties_to_protocol(
222        self,
223        new_properties: &HashMap<String, String>,
224        raise_if_not_exists: bool,
225    ) -> DeltaResult<Protocol> {
226        let mut inner = ProtocolInner::from_kernel(&self);
227        inner = inner.apply_properties_to_protocol(new_properties, raise_if_not_exists)?;
228        Ok(inner.as_kernel())
229    }
230}
231
232#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
233#[serde(rename_all = "camelCase")]
234/// Temporary Shim to facilitate adoption of kernel protocol.
235///
236/// This is more or less our old local implementation of protocol. We keep it around
237/// to use the various update and translation methods defined in this struct and
238/// use it to proxy updates to the kernel protocol action.
239///
240// TODO: Remove once we can use kernel protocol update APIs.
241pub(crate) struct ProtocolInner {
242    /// The minimum version of the Delta read protocol that a client must implement
243    /// in order to correctly read this table
244    pub min_reader_version: i32,
245    /// The minimum version of the Delta write protocol that a client must implement
246    /// in order to correctly write this table
247    pub min_writer_version: i32,
248    /// A collection of features that a client must implement in order to correctly
249    /// read this table (exist only when minReaderVersion is set to 3)
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub reader_features: Option<HashSet<TableFeature>>,
252    /// A collection of features that a client must implement in order to correctly
253    /// write this table (exist only when minWriterVersion is set to 7)
254    #[serde(skip_serializing_if = "Option::is_none")]
255    pub writer_features: Option<HashSet<TableFeature>>,
256}
257
258impl Default for ProtocolInner {
259    fn default() -> Self {
260        Self {
261            min_reader_version: 1,
262            min_writer_version: 2,
263            reader_features: None,
264            writer_features: None,
265        }
266    }
267}
268
269impl ProtocolInner {
270    /// Create a new protocol action
271    #[cfg(test)]
272    pub(crate) fn new(min_reader_version: i32, min_writer_version: i32) -> Self {
273        Self {
274            min_reader_version,
275            min_writer_version,
276            reader_features: None,
277            writer_features: None,
278        }
279    }
280
281    pub(crate) fn from_kernel(value: &Protocol) -> ProtocolInner {
282        // this ugliness is a stop-gap until we resolve: https://github.com/delta-io/delta-kernel-rs/issues/1055
283        serde_json::from_value(serde_json::to_value(value).unwrap()).unwrap()
284    }
285
286    pub(crate) fn as_kernel(&self) -> Protocol {
287        // this ugliness is a stop-gap until we resolve: https://github.com/delta-io/delta-kernel-rs/issues/1055
288        serde_json::from_value(serde_json::to_value(self).unwrap()).unwrap()
289    }
290
291    /// Append the reader features in the protocol action, automatically bumps min_reader_version
292    pub fn append_reader_features(
293        mut self,
294        reader_features: impl IntoIterator<Item = impl Into<TableFeature>>,
295    ) -> Self {
296        let all_reader_features = reader_features
297            .into_iter()
298            .map(Into::into)
299            .collect::<HashSet<_>>();
300        if !all_reader_features.is_empty() {
301            self.min_reader_version = 3;
302            match self.reader_features {
303                Some(mut features) => {
304                    features.extend(all_reader_features);
305                    self.reader_features = Some(features);
306                }
307                None => self.reader_features = Some(all_reader_features),
308            };
309        }
310        self
311    }
312
313    /// Append the writer features in the protocol action, automatically bumps min_writer_version
314    pub fn append_writer_features(
315        mut self,
316        writer_features: impl IntoIterator<Item = impl Into<TableFeature>>,
317    ) -> Self {
318        let all_writer_features = writer_features
319            .into_iter()
320            .map(|c| c.into())
321            .collect::<HashSet<_>>();
322        if !all_writer_features.is_empty() {
323            self.min_writer_version = 7;
324
325            match self.writer_features {
326                Some(mut features) => {
327                    features.extend(all_writer_features);
328                    self.writer_features = Some(features);
329                }
330                None => self.writer_features = Some(all_writer_features),
331            };
332        }
333        self
334    }
335
336    /// Converts existing properties into features if the reader_version is >=3 or writer_version >=7
337    /// only converts features that are "true"
338    pub fn move_table_properties_into_features(
339        mut self,
340        configuration: &HashMap<String, String>,
341    ) -> Self {
342        fn parse_bool(value: &str) -> bool {
343            value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
344        }
345
346        if self.min_writer_version >= 7 {
347            // TODO: move this is in future to use delta_kernel::table_properties
348            let mut converted_writer_features = configuration
349                .iter()
350                .filter(|(_, value)| value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
351                .filter_map(|(key, value)| match key.as_str() {
352                    "delta.enableChangeDataFeed" if parse_bool(value) => {
353                        Some(TableFeature::ChangeDataFeed)
354                    }
355                    "delta.appendOnly" if parse_bool(value) => Some(TableFeature::AppendOnly),
356                    "delta.enableDeletionVectors" if parse_bool(value) => {
357                        Some(TableFeature::DeletionVectors)
358                    }
359                    "delta.enableRowTracking" if parse_bool(value) => {
360                        Some(TableFeature::RowTracking)
361                    }
362                    "delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
363                    _ => None,
364                })
365                .collect::<HashSet<TableFeature>>();
366
367            if configuration
368                .keys()
369                .any(|v| v.starts_with("delta.constraints."))
370            {
371                converted_writer_features.insert(TableFeature::CheckConstraints);
372            }
373
374            match self.writer_features {
375                Some(mut features) => {
376                    features.extend(converted_writer_features);
377                    self.writer_features = Some(features);
378                }
379                None => self.writer_features = Some(converted_writer_features),
380            }
381        }
382        if self.min_reader_version >= 3 {
383            let converted_reader_features = configuration
384                .iter()
385                .filter_map(|(key, value)| match key.as_str() {
386                    "delta.enableDeletionVectors" if parse_bool(value) => {
387                        Some(TableFeature::DeletionVectors)
388                    }
389                    "delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
390                    _ => None,
391                })
392                .collect::<HashSet<TableFeature>>();
393            match self.reader_features {
394                Some(mut features) => {
395                    features.extend(converted_reader_features);
396                    self.reader_features = Some(features);
397                }
398                None => self.reader_features = Some(converted_reader_features),
399            }
400        }
401        self
402    }
403
404    /// Will apply the column metadata to the protocol by either bumping the version or setting
405    /// features
406    pub fn apply_column_metadata_to_protocol(mut self, schema: &StructType) -> DeltaResult<Self> {
407        let generated_cols = schema.get_generated_columns()?;
408        let invariants = schema.get_invariants()?;
409        let contains_timestamp_ntz = self.contains_timestampntz(schema.fields());
410
411        if contains_timestamp_ntz {
412            self = self.enable_timestamp_ntz()
413        }
414
415        if !generated_cols.is_empty() {
416            self = self.enable_generated_columns()
417        }
418
419        if !invariants.is_empty() {
420            self = self.enable_invariants()
421        }
422
423        Ok(self)
424    }
425
426    /// Will apply the properties to the protocol by either bumping the version or setting
427    /// features
428    pub fn apply_properties_to_protocol(
429        mut self,
430        new_properties: &HashMap<String, String>,
431        raise_if_not_exists: bool,
432    ) -> DeltaResult<Self> {
433        let mut parsed_properties: HashMap<TableProperty, String> = HashMap::new();
434
435        for (key, value) in new_properties {
436            if let Ok(parsed_key) = key.parse::<TableProperty>() {
437                parsed_properties.insert(parsed_key, value.to_string());
438            } else if raise_if_not_exists {
439                return Err(Error::Generic(format!(
440                    "Error parsing property '{key}':'{value}'",
441                )));
442            }
443        }
444
445        // Check and update delta.minReaderVersion
446        if let Some(min_reader_version) = parsed_properties.get(&TableProperty::MinReaderVersion) {
447            let new_min_reader_version = min_reader_version.parse::<i32>();
448            match new_min_reader_version {
449                Ok(version) => match version {
450                    1..=3 => {
451                        if version > self.min_reader_version {
452                            self.min_reader_version = version
453                        }
454                    }
455                    _ => {
456                        return Err(Error::Generic(format!(
457                            "delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
458                        )));
459                    }
460                },
461                Err(_) => {
462                    return Err(Error::Generic(format!(
463                        "delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
464                    )));
465                }
466            }
467        }
468
469        // Check and update delta.minWriterVersion
470        if let Some(min_writer_version) = parsed_properties.get(&TableProperty::MinWriterVersion) {
471            let new_min_writer_version = min_writer_version.parse::<i32>();
472            match new_min_writer_version {
473                Ok(version) => match version {
474                    2..=7 => {
475                        if version > self.min_writer_version {
476                            self.min_writer_version = version
477                        }
478                    }
479                    _ => {
480                        return Err(Error::Generic(format!(
481                            "delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
482                        )));
483                    }
484                },
485                Err(_) => {
486                    return Err(Error::Generic(format!(
487                        "delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
488                    )));
489                }
490            }
491        }
492
493        // Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7
494        if let Some(enable_cdf) = parsed_properties.get(&TableProperty::EnableChangeDataFeed) {
495            let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
496            match if_enable_cdf {
497                Ok(true) => {
498                    if self.min_writer_version >= 7 {
499                        match self.writer_features {
500                            Some(mut features) => {
501                                features.insert(TableFeature::ChangeDataFeed);
502                                self.writer_features = Some(features);
503                            }
504                            None => {
505                                self.writer_features =
506                                    Some(HashSet::from([TableFeature::ChangeDataFeed]))
507                            }
508                        }
509                    } else if self.min_writer_version <= 3 {
510                        self.min_writer_version = 4
511                    }
512                }
513                Ok(false) => {}
514                _ => {
515                    return Err(Error::Generic(format!(
516                        "delta.enableChangeDataFeed = '{enable_cdf}' is invalid, valid values are ['true']"
517                    )));
518                }
519            }
520        }
521
522        if let Some(enable_dv) = parsed_properties.get(&TableProperty::EnableDeletionVectors) {
523            let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
524            match if_enable_dv {
525                Ok(true) => {
526                    let writer_features = match self.writer_features {
527                        Some(mut features) => {
528                            features.insert(TableFeature::DeletionVectors);
529                            features
530                        }
531                        None => HashSet::from([TableFeature::DeletionVectors]),
532                    };
533                    let reader_features = match self.reader_features {
534                        Some(mut features) => {
535                            features.insert(TableFeature::DeletionVectors);
536                            features
537                        }
538                        None => HashSet::from([TableFeature::DeletionVectors]),
539                    };
540                    self.min_reader_version = 3;
541                    self.min_writer_version = 7;
542                    self.writer_features = Some(writer_features);
543                    self.reader_features = Some(reader_features);
544                }
545                Ok(false) => {}
546                _ => {
547                    return Err(Error::Generic(format!(
548                        "delta.enableDeletionVectors = '{enable_dv}' is invalid, valid values are ['true']"
549                    )));
550                }
551            }
552        }
553        Ok(self)
554    }
555
556    /// checks if table contains timestamp_ntz in any field including nested fields.
557    fn contains_timestampntz<'a>(&self, fields: impl Iterator<Item = &'a StructField>) -> bool {
558        contains_timestampntz(fields)
559    }
560
561    /// Enable timestamp_ntz in the protocol
562    fn enable_timestamp_ntz(mut self) -> Self {
563        self = self.append_reader_features([TableFeature::TimestampWithoutTimezone]);
564        self = self.append_writer_features([TableFeature::TimestampWithoutTimezone]);
565        self
566    }
567
568    /// Enabled generated columns
569    fn enable_generated_columns(mut self) -> Self {
570        if self.min_writer_version < 4 {
571            self.min_writer_version = 4;
572        }
573        if self.min_writer_version >= 7 {
574            self = self.append_writer_features([TableFeature::GeneratedColumns]);
575        }
576        self
577    }
578
579    /// Enabled generated columns
580    fn enable_invariants(mut self) -> Self {
581        if self.min_writer_version >= 7 {
582            self = self.append_writer_features([TableFeature::Invariants]);
583        }
584        self
585    }
586}
587
588/// High level table features
589#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
590#[serde(rename_all = "camelCase")]
591pub enum TableFeatures {
592    /// Mapping of one column to another
593    ColumnMapping,
594    /// Deletion vectors for merge, update, delete
595    DeletionVectors,
596    /// timestamps without timezone support
597    #[serde(rename = "timestampNtz")]
598    TimestampWithoutTimezone,
599    /// version 2 of checkpointing
600    V2Checkpoint,
601    /// Append Only Tables
602    AppendOnly,
603    /// Table invariants
604    Invariants,
605    /// Check constraints on columns
606    CheckConstraints,
607    /// CDF on a table
608    ChangeDataFeed,
609    /// Columns with generated values
610    GeneratedColumns,
611    /// ID Columns
612    IdentityColumns,
613    /// Row tracking on tables
614    RowTracking,
615    /// domain specific metadata
616    DomainMetadata,
617    /// Iceberg compatibility support
618    IcebergCompatV1,
619    MaterializePartitionColumns,
620}
621
622impl FromStr for TableFeatures {
623    type Err = ();
624
625    fn from_str(value: &str) -> Result<Self, Self::Err> {
626        match value {
627            "columnMapping" => Ok(TableFeatures::ColumnMapping),
628            "deletionVectors" => Ok(TableFeatures::DeletionVectors),
629            "timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
630            "v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
631            "appendOnly" => Ok(TableFeatures::AppendOnly),
632            "invariants" => Ok(TableFeatures::Invariants),
633            "checkConstraints" => Ok(TableFeatures::CheckConstraints),
634            "changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
635            "generatedColumns" => Ok(TableFeatures::GeneratedColumns),
636            "identityColumns" => Ok(TableFeatures::IdentityColumns),
637            "rowTracking" => Ok(TableFeatures::RowTracking),
638            "domainMetadata" => Ok(TableFeatures::DomainMetadata),
639            "icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
640            "materializePartitionColumns" => Ok(TableFeatures::MaterializePartitionColumns),
641            _ => Err(()),
642        }
643    }
644}
645
646impl AsRef<str> for TableFeatures {
647    fn as_ref(&self) -> &str {
648        match self {
649            TableFeatures::ColumnMapping => "columnMapping",
650            TableFeatures::DeletionVectors => "deletionVectors",
651            TableFeatures::TimestampWithoutTimezone => "timestampNtz",
652            TableFeatures::V2Checkpoint => "v2Checkpoint",
653            TableFeatures::AppendOnly => "appendOnly",
654            TableFeatures::Invariants => "invariants",
655            TableFeatures::CheckConstraints => "checkConstraints",
656            TableFeatures::ChangeDataFeed => "changeDataFeed",
657            TableFeatures::GeneratedColumns => "generatedColumns",
658            TableFeatures::IdentityColumns => "identityColumns",
659            TableFeatures::RowTracking => "rowTracking",
660            TableFeatures::DomainMetadata => "domainMetadata",
661            TableFeatures::IcebergCompatV1 => "icebergCompatV1",
662            TableFeatures::MaterializePartitionColumns => "materializePartitionColumns",
663        }
664    }
665}
666
667impl fmt::Display for TableFeatures {
668    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669        write!(f, "{}", self.as_ref())
670    }
671}
672
673impl TryFrom<&TableFeatures> for TableFeature {
674    type Error = strum::ParseError;
675
676    fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
677        TableFeature::try_from(value.as_ref())
678    }
679}
680
681impl TableFeatures {
682    /// Convert table feature to respective reader or/and write feature
683    pub fn to_reader_writer_features(&self) -> (Option<TableFeature>, Option<TableFeature>) {
684        let feature = TableFeature::try_from(self).ok();
685        match feature {
686            Some(feature) => {
687                // Classify features based on their type
688                // Writer-only features
689                match feature {
690                    TableFeature::AppendOnly
691                    | TableFeature::Invariants
692                    | TableFeature::CheckConstraints
693                    | TableFeature::ChangeDataFeed
694                    | TableFeature::GeneratedColumns
695                    | TableFeature::IdentityColumns
696                    | TableFeature::InCommitTimestamp
697                    | TableFeature::RowTracking
698                    | TableFeature::DomainMetadata
699                    | TableFeature::IcebergCompatV1
700                    | TableFeature::IcebergCompatV2
701                    | TableFeature::ClusteredTable
702                    | TableFeature::MaterializePartitionColumns => (None, Some(feature)),
703
704                    // ReaderWriter features
705                    TableFeature::CatalogManaged
706                    | TableFeature::CatalogOwnedPreview
707                    | TableFeature::ColumnMapping
708                    | TableFeature::DeletionVectors
709                    | TableFeature::TimestampWithoutTimezone
710                    | TableFeature::TypeWidening
711                    | TableFeature::TypeWideningPreview
712                    | TableFeature::V2Checkpoint
713                    | TableFeature::VacuumProtocolCheck
714                    | TableFeature::VariantType
715                    | TableFeature::VariantTypePreview
716                    | TableFeature::VariantShreddingPreview => {
717                        (Some(feature.clone()), Some(feature))
718                    }
719
720                    // Unknown features
721                    TableFeature::Unknown(_) => (None, None),
722                }
723            }
724            None => (None, None),
725        }
726    }
727}
728
729///Storage type of deletion vector
730#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, Default)]
731pub enum StorageType {
732    /// Stored at relative path derived from a UUID.
733    #[serde(rename = "u")]
734    #[default]
735    UuidRelativePath,
736    /// Stored as inline string.
737    #[serde(rename = "i")]
738    Inline,
739    /// Stored at an absolute path.
740    #[serde(rename = "p")]
741    AbsolutePath,
742}
743
744impl FromStr for StorageType {
745    type Err = Error;
746
747    fn from_str(s: &str) -> Result<Self, Self::Err> {
748        match s {
749            "u" => Ok(Self::UuidRelativePath),
750            "i" => Ok(Self::Inline),
751            "p" => Ok(Self::AbsolutePath),
752            _ => Err(Error::DeletionVector(format!(
753                "Unknown storage format: '{s}'."
754            ))),
755        }
756    }
757}
758
759impl AsRef<str> for StorageType {
760    fn as_ref(&self) -> &str {
761        match self {
762            Self::UuidRelativePath => "u",
763            Self::Inline => "i",
764            Self::AbsolutePath => "p",
765        }
766    }
767}
768
769impl Display for StorageType {
770    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771        write!(f, "{}", self.as_ref())
772    }
773}
774
775#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
776#[serde(rename_all = "camelCase")]
777/// Defines a deletion vector
778pub struct DeletionVectorDescriptor {
779    /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
780    pub storage_type: StorageType,
781
782    /// Three format options are currently proposed:
783    /// - If `storageType = 'u'` then `<random prefix - optional><base85 encoded uuid>`:
784    ///   The deletion vector is stored in a file with a path relative to the data
785    ///   directory of this Delta table, and the file name can be reconstructed from
786    ///   the UUID. See Derived Fields for how to reconstruct the file name. The random
787    ///   prefix is recovered as the extra characters before the (20 characters fixed length) uuid.
788    /// - If `storageType = 'i'` then `<base85 encoded bytes>`: The deletion vector
789    ///   is stored inline in the log. The format used is the `RoaringBitmapArray`
790    ///   format also used when the DV is stored on disk and described in [Deletion Vector Format].
791    /// - If `storageType = 'p'` then `<absolute path>`: The DV is stored in a file with an
792    ///   absolute path given by this path, which has the same format as the `path` field
793    ///   in the `add`/`remove` actions.
794    ///
795    /// [Deletion Vector Format]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Deletion-Vector-Format
796    pub path_or_inline_dv: String,
797
798    /// Start of the data for this DV in number of bytes from the beginning of the file it is stored in.
799    /// Always None (absent in JSON) when `storageType = 'i'`.
800    #[serde(skip_serializing_if = "Option::is_none")]
801    pub offset: Option<i32>,
802
803    /// Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline).
804    pub size_in_bytes: i32,
805
806    /// Number of rows the given DV logically removes from the file.
807    pub cardinality: i64,
808}
809
810#[derive(Serialize, Deserialize, Debug, Clone, Default)]
811#[serde(rename_all = "camelCase")]
812/// Defines an add action
813pub struct Add {
814    /// A relative path to a data file from the root of the table or an absolute path to a file
815    /// that should be added to the table. The path is a URI as specified by
816    /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
817    ///
818    /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
819    #[serde(with = "serde_path")]
820    pub path: String,
821
822    /// A map from partition column to value for this logical file.
823    pub partition_values: HashMap<String, Option<String>>,
824
825    /// The size of this data file in bytes
826    pub size: i64,
827
828    /// The time this logical file was created, as milliseconds since the epoch.
829    pub modification_time: i64,
830
831    /// When `false` the logical file must already be present in the table or the records
832    /// in the added file must be contained in one or more remove actions in the same version.
833    pub data_change: bool,
834
835    /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
836    ///
837    /// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
838    pub stats: Option<String>,
839
840    /// Map containing metadata about this logical file.
841    pub tags: Option<HashMap<String, Option<String>>>,
842
843    #[serde(skip_serializing_if = "Option::is_none")]
844    /// Information about deletion vector (DV) associated with this add action
845    pub deletion_vector: Option<DeletionVectorDescriptor>,
846
847    /// Default generated Row ID of the first row in the file. The default generated Row IDs
848    /// of the other rows in the file can be reconstructed by adding the physical index of the
849    /// row within the file to the base Row ID
850    pub base_row_id: Option<i64>,
851
852    /// First commit version in which an add action with the same path was committed to the table.
853    pub default_row_commit_version: Option<i64>,
854
855    /// The name of the clustering implementation
856    pub clustering_provider: Option<String>,
857}
858
859/// Represents a tombstone (deleted file) in the Delta log.
860#[derive(Serialize, Deserialize, Debug, Clone, Eq, Default)]
861#[serde(rename_all = "camelCase")]
862pub struct Remove {
863    /// A relative path to a data file from the root of the table or an absolute path to a file
864    /// that should be added to the table. The path is a URI as specified by
865    /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
866    ///
867    /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
868    #[serde(with = "serde_path")]
869    pub path: String,
870
871    /// When `false` the logical file must already be present in the table or the records
872    /// in the added file must be contained in one or more remove actions in the same version.
873    pub data_change: bool,
874
875    /// The time this logical file was created, as milliseconds since the epoch.
876    #[serde(skip_serializing_if = "Option::is_none")]
877    pub deletion_timestamp: Option<i64>,
878
879    /// When true the fields `partition_values`, `size`, and `tags` are present
880    #[serde(skip_serializing_if = "Option::is_none")]
881    pub extended_file_metadata: Option<bool>,
882
883    /// A map from partition column to value for this logical file.
884    #[serde(skip_serializing_if = "Option::is_none")]
885    pub partition_values: Option<HashMap<String, Option<String>>>,
886
887    /// The size of this data file in bytes
888    #[serde(skip_serializing_if = "Option::is_none")]
889    pub size: Option<i64>,
890
891    /// Map containing metadata about this logical file.
892    #[serde(skip_serializing_if = "Option::is_none")]
893    pub tags: Option<HashMap<String, Option<String>>>,
894
895    /// Information about deletion vector (DV) associated with this add action
896    #[serde(skip_serializing_if = "Option::is_none")]
897    pub deletion_vector: Option<DeletionVectorDescriptor>,
898
899    /// Default generated Row ID of the first row in the file. The default generated Row IDs
900    /// of the other rows in the file can be reconstructed by adding the physical index of the
901    /// row within the file to the base Row ID
902    #[serde(skip_serializing_if = "Option::is_none")]
903    pub base_row_id: Option<i64>,
904
905    /// First commit version in which an add action with the same path was committed to the table.
906    #[serde(skip_serializing_if = "Option::is_none")]
907    pub default_row_commit_version: Option<i64>,
908}
909
910/// Delta AddCDCFile action that describes a parquet CDC data file.
911#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
912#[serde(rename_all = "camelCase")]
913pub struct AddCDCFile {
914    /// A relative path, from the root of the table, or an
915    /// absolute path to a CDC file
916    #[serde(with = "serde_path")]
917    pub path: String,
918
919    /// The size of this file in bytes
920    pub size: i64,
921
922    /// A map from partition column to value for this file
923    pub partition_values: HashMap<String, Option<String>>,
924
925    /// Should always be set to false because they do not change the underlying data of the table
926    pub data_change: bool,
927
928    /// Map containing metadata about this file
929    #[serde(skip_serializing_if = "Option::is_none")]
930    pub tags: Option<HashMap<String, Option<String>>>,
931}
932
933/// Action used by streaming systems to track progress using application-specific versions to
934/// enable idempotency.
935#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq)]
936#[serde(rename_all = "camelCase")]
937pub struct Transaction {
938    /// A unique identifier for the application performing the transaction.
939    pub app_id: String,
940
941    /// An application-specific numeric identifier for this transaction.
942    pub version: i64,
943
944    /// The time when this transaction action was created in milliseconds since the Unix epoch.
945    #[serde(skip_serializing_if = "Option::is_none")]
946    pub last_updated: Option<i64>,
947}
948
949impl Transaction {
950    /// Create a new application transactions. See [`Txn`] for details.
951    pub fn new(app_id: impl ToString, version: i64) -> Self {
952        Self::new_with_last_update(app_id, version, None)
953    }
954
955    /// Create a new application transactions. See [`Txn`] for details.
956    pub fn new_with_last_update(
957        app_id: impl ToString,
958        version: i64,
959        last_updated: Option<i64>,
960    ) -> Self {
961        Transaction {
962            app_id: app_id.to_string(),
963            version,
964            last_updated,
965        }
966    }
967}
968
969/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
970/// However the reference implementation as well as delta-rs store useful information that may for instance
971/// allow us to be more permissive in commit conflict resolution.
972#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
973#[serde(rename_all = "camelCase")]
974pub struct CommitInfo {
975    /// Timestamp in millis when the commit was created
976    #[serde(skip_serializing_if = "Option::is_none")]
977    pub timestamp: Option<i64>,
978
979    /// Id of the user invoking the commit
980    #[serde(skip_serializing_if = "Option::is_none")]
981    pub user_id: Option<String>,
982
983    /// Name of the user invoking the commit
984    #[serde(skip_serializing_if = "Option::is_none")]
985    pub user_name: Option<String>,
986
987    /// The operation performed during the
988    #[serde(skip_serializing_if = "Option::is_none")]
989    pub operation: Option<String>,
990
991    /// Parameters used for table operation
992    #[serde(skip_serializing_if = "Option::is_none")]
993    pub operation_parameters: Option<HashMap<String, serde_json::Value>>,
994
995    /// Version of the table when the operation was started
996    #[serde(skip_serializing_if = "Option::is_none")]
997    pub read_version: Option<i64>,
998
999    /// The isolation level of the commit
1000    #[serde(skip_serializing_if = "Option::is_none")]
1001    pub isolation_level: Option<IsolationLevel>,
1002
1003    /// TODO
1004    #[serde(skip_serializing_if = "Option::is_none")]
1005    pub is_blind_append: Option<bool>,
1006
1007    /// Delta engine which created the commit.
1008    #[serde(skip_serializing_if = "Option::is_none")]
1009    pub engine_info: Option<String>,
1010
1011    /// Additional provenance information for the commit
1012    #[serde(flatten, default)]
1013    pub info: HashMap<String, serde_json::Value>,
1014
1015    /// User defined metadata
1016    #[serde(skip_serializing_if = "Option::is_none")]
1017    pub user_metadata: Option<String>,
1018}
1019
1020/// The domain metadata action contains a configuration (string) for a named metadata domain
1021#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1022#[serde(rename_all = "camelCase")]
1023pub struct DomainMetadata {
1024    /// Identifier for this domain (system or user-provided)
1025    pub domain: String,
1026
1027    /// String containing configuration for the metadata domain
1028    pub configuration: String,
1029
1030    /// When `true` the action serves as a tombstone
1031    pub removed: bool,
1032}
1033
1034#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1035/// This action is only allowed in checkpoints following V2 spec. It describes the details about the checkpoint.
1036pub struct CheckpointMetadata {
1037    /// The flavor of the V2 checkpoint. Allowed values: "flat".
1038    pub flavor: String,
1039
1040    /// Map containing any additional metadata about the v2 spec checkpoint.
1041    #[serde(skip_serializing_if = "Option::is_none")]
1042    pub tags: Option<HashMap<String, Option<String>>>,
1043}
1044
1045/// The sidecar action references a sidecar file which provides some of the checkpoint's file actions.
1046/// This action is only allowed in checkpoints following V2 spec.
1047#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
1048#[serde(rename_all = "camelCase")]
1049pub struct Sidecar {
1050    /// The name of the sidecar file (not a path).
1051    /// The file must reside in the _delta_log/_sidecars directory.
1052    pub file_name: String,
1053
1054    /// The size of the sidecar file in bytes
1055    pub size_in_bytes: i64,
1056
1057    /// The time this sidecar file was created, as milliseconds since the epoch.
1058    pub modification_time: i64,
1059
1060    /// Type of sidecar. Valid values are: "fileaction".
1061    /// This could be extended in future to allow different kinds of sidecars.
1062    #[serde(rename = "type")]
1063    pub sidecar_type: String,
1064
1065    /// Map containing any additional metadata about the checkpoint sidecar file.
1066    #[serde(skip_serializing_if = "Option::is_none")]
1067    pub tags: Option<HashMap<String, Option<String>>>,
1068}
1069
1070#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
1071/// The isolation level applied during transaction
1072#[derive(Default)]
1073pub enum IsolationLevel {
1074    /// The strongest isolation level. It ensures that committed write operations
1075    /// and all reads are Serializable. Operations are allowed as long as there
1076    /// exists a serial sequence of executing them one-at-a-time that generates
1077    /// the same outcome as that seen in the table. For the write operations,
1078    /// the serial sequence is exactly the same as that seen in the table’s history.
1079    #[default]
1080    Serializable,
1081
1082    /// A weaker isolation level than Serializable. It ensures only that the write
1083    /// operations (that is, not reads) are serializable. However, this is still stronger
1084    /// than Snapshot isolation. WriteSerializable is the default isolation level because
1085    /// it provides great balance of data consistency and availability for most common operations.
1086    WriteSerializable,
1087
1088    /// SnapshotIsolation is a guarantee that all reads made in a transaction will see a consistent
1089    /// snapshot of the database (in practice it reads the last committed values that existed at the
1090    /// time it started), and the transaction itself will successfully commit only if no updates
1091    /// it has made conflict with any concurrent updates made since that snapshot.
1092    SnapshotIsolation,
1093}
1094
1095// Spark assumes Serializable as default isolation level
1096// https://github.com/delta-io/delta/blob/abb171c8401200e7772b27e3be6ea8682528ac72/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1023
1097
1098impl AsRef<str> for IsolationLevel {
1099    fn as_ref(&self) -> &str {
1100        match self {
1101            Self::Serializable => "Serializable",
1102            Self::WriteSerializable => "WriteSerializable",
1103            Self::SnapshotIsolation => "SnapshotIsolation",
1104        }
1105    }
1106}
1107
1108impl FromStr for IsolationLevel {
1109    type Err = Error;
1110
1111    fn from_str(s: &str) -> Result<Self, Self::Err> {
1112        match s.to_ascii_lowercase().as_str() {
1113            "serializable" => Ok(Self::Serializable),
1114            "writeserializable" | "write_serializable" => Ok(Self::WriteSerializable),
1115            "snapshotisolation" | "snapshot_isolation" => Ok(Self::SnapshotIsolation),
1116            _ => Err(Error::Generic("Invalid string for IsolationLevel".into())),
1117        }
1118    }
1119}
1120
1121pub(crate) mod serde_path {
1122    use std::str::Utf8Error;
1123
1124    use percent_encoding::{AsciiSet, CONTROLS, percent_decode_str, percent_encode};
1125    use serde::{self, Deserialize, Deserializer, Serialize, Serializer};
1126
1127    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1128    where
1129        D: Deserializer<'de>,
1130    {
1131        let s = String::deserialize(deserializer)?;
1132        decode_path(&s).map_err(serde::de::Error::custom)
1133    }
1134
1135    pub fn serialize<S>(value: &str, serializer: S) -> Result<S::Ok, S::Error>
1136    where
1137        S: Serializer,
1138    {
1139        let encoded = encode_path(value);
1140        String::serialize(&encoded, serializer)
1141    }
1142
1143    pub const _DELIMITER: &str = "/";
1144    /// The path delimiter as a single byte
1145    pub const _DELIMITER_BYTE: u8 = _DELIMITER.as_bytes()[0];
1146
1147    /// Characters we want to encode.
1148    const INVALID: &AsciiSet = &CONTROLS
1149        // The delimiter we are reserving for internal hierarchy
1150        // .add(DELIMITER_BYTE)
1151        // Characters AWS recommends avoiding for object keys
1152        // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
1153        .add(b'\\')
1154        .add(b'{')
1155        .add(b'^')
1156        .add(b'}')
1157        .add(b'%')
1158        .add(b'`')
1159        .add(b']')
1160        .add(b'"')
1161        .add(b'>')
1162        .add(b'[')
1163        // .add(b'~')
1164        .add(b'<')
1165        .add(b'#')
1166        .add(b'|')
1167        // Characters Google Cloud Storage recommends avoiding for object names
1168        // https://cloud.google.com/storage/docs/naming-objects
1169        .add(b'\r')
1170        .add(b'\n')
1171        .add(b'*')
1172        .add(b'?');
1173
1174    fn encode_path(path: &str) -> String {
1175        percent_encode(path.as_bytes(), INVALID).to_string()
1176    }
1177
1178    pub fn decode_path(path: &str) -> Result<String, Utf8Error> {
1179        Ok(percent_decode_str(path).decode_utf8()?.to_string())
1180    }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::*;
1186    use crate::kernel::PrimitiveType;
1187
1188    #[test]
1189    fn test_primitive() {
1190        let types: PrimitiveType = serde_json::from_str("\"string\"").unwrap();
1191        println!("{types:?}");
1192    }
1193
1194    #[test]
1195    fn test_deserialize_protocol() {
1196        // protocol json data
1197        let raw = serde_json::json!(
1198            {
1199              "minReaderVersion": 3,
1200              "minWriterVersion": 7,
1201              "readerFeatures": ["catalogOwned"],
1202              "writerFeatures": ["catalogOwned", "invariants", "appendOnly"]
1203            }
1204        );
1205        let protocol: Protocol = serde_json::from_value(raw).unwrap();
1206        assert_eq!(protocol.min_reader_version(), 3);
1207        assert_eq!(protocol.min_writer_version(), 7);
1208        assert_eq!(
1209            protocol.reader_features(),
1210            Some(vec![TableFeature::Unknown("catalogOwned".to_owned())].as_slice())
1211        );
1212        assert_eq!(
1213            protocol.writer_features(),
1214            Some(
1215                vec![
1216                    TableFeature::Unknown("catalogOwned".to_owned()),
1217                    TableFeature::Invariants,
1218                    TableFeature::AppendOnly
1219                ]
1220                .as_slice()
1221            )
1222        );
1223    }
1224
1225    // #[test]
1226    // fn test_deletion_vector_read() {
1227    //     let store = Arc::new(LocalFileSystem::new());
1228    //     let path =
1229    //         std::fs::canonicalize(PathBuf::from("../deltalake-test/tests/data/table-with-dv-small/")).unwrap();
1230    //     let parent = url::Url::from_directory_path(path).unwrap();
1231    //     let root = object_store::path::Path::from(parent.path());
1232    //     let fs_client = Arc::new(ObjectStoreFileSystemClient::new(
1233    //         store,
1234    //         root,
1235    //         Arc::new(TokioBackgroundExecutor::new()),
1236    //     ));
1237    //
1238    //     let example = dv_example();
1239    //     let tree_map = example.read(fs_client, parent).unwrap();
1240    //
1241    //     let expected: Vec<u64> = vec![0, 9];
1242    //     let found = tree_map.iter().collect::<Vec<_>>();
1243    //     assert_eq!(found, expected)
1244    // }
1245}