1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use async_trait::async_trait;
use deltalake::DeltaTableError;

use crate::protocol::action::{FileBuilder, MetadataBuilder, ProtocolBuilder};
use crate::protocol::table::{
    TableMetadata, TableVersionNumber, UnsignedTableData, Version, VersionRange,
};

use super::{TableReader, TableReaderError};

/// TableReader implementation for the Delta Lake format.
#[derive(Debug, Clone, PartialEq)]
pub struct DeltaTableReader;

impl DeltaTableReader {
    /// Create a new instance of the Delta Lake TableReader.
    pub fn new() -> Self {
        Self {}
    }
}

impl Default for DeltaTableReader {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl TableReader for DeltaTableReader {
    async fn get_table_version(
        &self,
        storage_path: &str,
        version: Version,
    ) -> Result<TableVersionNumber, TableReaderError> {
        match version {
            Version::Latest => {
                let delta_table = deltalake::open_table(storage_path).await?;
                Ok(delta_table.version() as u64)
            }
            Version::Timestamp(ts) => {
                let delta_table =
                    deltalake::open_table_with_ds(storage_path, ts.to_rfc3339()).await?;
                Ok(delta_table.version() as u64)
            }
        }
    }

    async fn get_table_metadata(
        &self,
        storage_path: &str,
    ) -> Result<TableMetadata, TableReaderError> {
        let delta_table = deltalake::open_table(storage_path).await?;

        let min_reader_version = delta_table.get_min_reader_version() as u32;
        let table_protocol = ProtocolBuilder::new()
            .min_reader_version(min_reader_version)
            .build();

        let metadata = delta_table.get_metadata()?;
        let schema = serde_json::to_string(&delta_table.get_schema()?).unwrap();
        let configuration = metadata
            .configuration
            .clone()
            .into_iter()
            .map(|c| (c.0.clone(), c.1.unwrap_or_default()))
            .collect();
        let table_metadata = MetadataBuilder::new(metadata.id.clone(), schema)
            .partition_columns(metadata.partition_columns.clone())
            .configuration(configuration)
            .build();

        Ok(TableMetadata {
            version: delta_table.version() as u64,
            protocol: table_protocol,
            metadata: table_metadata,
        })
    }

    async fn get_table_data(
        &self,
        storage_path: &str,
        version: u64,
        _limit: Option<u64>,
        _predicates: Option<String>,
    ) -> Result<UnsignedTableData, TableReaderError> {
        let mut delta_table = deltalake::open_table(storage_path).await?;
        delta_table.load_version(version as i64).await?;

        let min_reader_version = delta_table.get_min_reader_version() as u32;
        let table_protocol = ProtocolBuilder::new()
            .min_reader_version(min_reader_version)
            .build();

        let metadata = delta_table.get_metadata()?;
        let schema = serde_json::to_string(&delta_table.get_schema()?).unwrap();
        let configuration = metadata
            .configuration
            .clone()
            .into_iter()
            .map(|c| (c.0.clone(), c.1.unwrap_or_default()))
            .collect();
        let table_metadata = MetadataBuilder::new(metadata.id.clone(), schema)
            .partition_columns(metadata.partition_columns.clone())
            .configuration(configuration)
            .build();

        let mut table_files = vec![];
        for file in delta_table.get_state().files() {
            let url = format!("{}/{}", storage_path, file.path);
            let f = FileBuilder::new(url, "").build();
            table_files.push(f.into());
        }

        Ok(UnsignedTableData {
            version: delta_table.version() as u64,
            protocol: table_protocol,
            metadata: table_metadata,
            data: table_files,
        })
    }

    async fn get_table_changes(
        &self,
        _storage_path: &str,
        _range: VersionRange,
    ) -> Result<UnsignedTableData, TableReaderError> {
        return Err(TableReaderError::Other);
    }
}

impl From<DeltaTableError> for TableReaderError {
    fn from(_value: DeltaTableError) -> Self {
        // TODO: meaningful error handling
        TableReaderError::Other
    }
}