iceberg_rust/object_store/
store.rs

1/*! Helpers for Intarting with object storage
2*/
3use async_trait::async_trait;
4use iceberg_rust_spec::{
5    tabular::{TabularMetadata, TabularMetadataRef},
6    util::strip_prefix,
7};
8use object_store::{Attributes, ObjectStore, PutOptions, TagSet};
9
10use crate::error::Error;
11use flate2::read::GzDecoder;
12use lazy_static::lazy_static;
13use regex::Regex;
14use std::io::Read;
15
16/// Simplify interaction with iceberg files
17#[async_trait]
18pub trait IcebergStore {
19    /// Get metadata file from object_storage
20    async fn get_metadata(&self, location: &str) -> Result<TabularMetadata, Error>;
21    /// Write metadata file to object_storage
22    async fn put_metadata(
23        &self,
24        location: &str,
25        metadata: TabularMetadataRef<'_>,
26    ) -> Result<(), Error>;
27    /// Write version-hint file to object_storage
28    async fn put_version_hint(&self, location: &str) -> Result<(), Error>;
29}
30
31#[async_trait]
32impl<T: ObjectStore> IcebergStore for T {
33    async fn get_metadata(&self, location: &str) -> Result<TabularMetadata, Error> {
34        let bytes = self
35            .get(&strip_prefix(location).into())
36            .await?
37            .bytes()
38            .await?;
39
40        parse_metadata(location, &bytes)
41    }
42
43    async fn put_metadata(
44        &self,
45        location: &str,
46        metadata: TabularMetadataRef<'_>,
47    ) -> Result<(), Error> {
48        self.put(
49            &strip_prefix(location).into(),
50            serde_json::to_vec(&metadata)?.into(),
51        )
52        .await?;
53
54        Ok(())
55    }
56
57    async fn put_version_hint(&self, location: &str) -> Result<(), Error> {
58        self.put_opts(
59            &version_hint_path(&strip_prefix(location))
60                .ok_or(Error::InvalidFormat(format!(
61                    "Path for version-hint for {location}"
62                )))?
63                .into(),
64            version_hint_content(location).into(),
65            PutOptions {
66                mode: object_store::PutMode::Overwrite,
67                tags: TagSet::default(),
68                attributes: Attributes::default(),
69                extensions: Default::default(),
70            },
71        )
72        .await?;
73
74        Ok(())
75    }
76}
77
78fn version_hint_path(original: &str) -> Option<String> {
79    Some(
80        std::path::Path::new(original)
81            .parent()?
82            .join("version-hint.text")
83            .to_str()?
84            .to_string(),
85    )
86}
87
88lazy_static! {
89    static ref SUPPORTED_METADATA_FILE_FORMATS: Vec<Regex> = vec![
90        // The standard metastore format https://iceberg.apache.org/spec/#metastore-tables
91        Regex::new(
92            r"^(?<version>[0-9]{5}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}).(?:gz.)?metadata.json$"
93        )
94        .unwrap(),
95        // The legacy file-system format https://iceberg.apache.org/spec/#file-system-tables
96        Regex::new(r"^v(?<version>[0-9]+).metadata.json$").unwrap(),
97    ];
98}
99
100/// Given a full path to a metadata file, extract an appropriate version hint that other readers
101/// without access to the catalog can parse.
102pub fn version_hint_content(original: &str) -> String {
103    original
104        .split("/")
105        .last()
106        .and_then(|filename| {
107            SUPPORTED_METADATA_FILE_FORMATS
108                .iter()
109                .filter_map(|regex| {
110                    regex.captures(filename).and_then(|capture| {
111                        capture
112                            .name("version")
113                            .and_then(|m| m.as_str().parse().ok())
114                    })
115                })
116                .next()
117        })
118        .unwrap_or(original.to_string())
119}
120
121fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error> {
122    if location.ends_with(".gz.metadata.json") {
123        let mut decoder = GzDecoder::new(bytes);
124        let mut decompressed_data = Vec::new();
125        decoder
126            .read_to_end(&mut decompressed_data)
127            .map_err(|e| Error::Decompress(e.to_string()))?;
128        serde_json::from_slice(&decompressed_data).map_err(Error::from)
129    } else {
130        serde_json::from_slice(bytes).map_err(Error::from)
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use rstest::rstest;
138    use std::io::Write;
139
140    #[test]
141    fn test_version_hint_path_normal_case() {
142        let input = "/path/to/metadata/v1.metadata.json";
143        let expected = "/path/to/metadata/version-hint.text";
144        assert_eq!(version_hint_path(input), Some(expected.to_string()));
145    }
146
147    #[test]
148    fn test_version_hint_path_relative() {
149        let input = "path/to/metadata/v1.metadata.json";
150        let expected = "path/to/metadata/version-hint.text";
151        assert_eq!(version_hint_path(input), Some(expected.to_string()));
152    }
153
154    #[test]
155    fn test_version_hint_path_single_file() {
156        let input = "file.json";
157        let expected = "version-hint.text";
158        assert_eq!(version_hint_path(input), Some(expected.to_string()));
159    }
160
161    #[test]
162    fn test_version_hint_path_empty_string() {
163        let input = "";
164        assert_eq!(version_hint_path(input), None);
165    }
166
167    #[test]
168    fn test_version_hint_path_with_special_characters() {
169        let input = "/path/with spaces/and#special@chars/file.json";
170        let expected = "/path/with spaces/and#special@chars/version-hint.text";
171        assert_eq!(version_hint_path(input), Some(expected.to_string()));
172    }
173
174    #[test]
175    fn test_version_hint_path_with_multiple_extensions() {
176        let input = "/path/to/file.with.multiple.extensions.json";
177        let expected = "/path/to/version-hint.text";
178        assert_eq!(version_hint_path(input), Some(expected.to_string()));
179    }
180
181    #[rstest]
182    #[case::file_format("/path/to/metadata/v2.metadata.json", "2")]
183    #[case::metastore_format_no_gzip(
184        "/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.metadata.json",
185        "00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
186    )]
187    #[case::metastore_format_with_gzip(
188        "/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.gz.metadata.json",
189        "00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
190    )]
191    #[test]
192    fn test_version_hint_content(#[case] input: &str, #[case] expected: &str) {
193        assert_eq!(version_hint_content(input), expected);
194    }
195
196    #[test]
197    fn test_parse_metadata_table_plain_json() {
198        let location = "/path/to/metadata/v1.metadata.json";
199        let json_data = r#"
200            {
201                "format-version" : 2,
202                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
203                "location": "s3://b/wh/data.db/table",
204                "last-sequence-number" : 1,
205                "last-updated-ms": 1515100955770,
206                "last-column-id": 1,
207                "schemas": [
208                    {
209                        "schema-id" : 1,
210                        "type" : "struct",
211                        "fields" :[
212                            {
213                                "id": 1,
214                                "name": "struct_name",
215                                "required": true,
216                                "type": "fixed[1]"
217                            }
218                        ]
219                    }
220                ],
221                "current-schema-id" : 1,
222                "partition-specs": [
223                    {
224                        "spec-id": 1,
225                        "fields": [
226                            {  
227                                "source-id": 4,  
228                                "field-id": 1000,  
229                                "name": "ts_day",  
230                                "transform": "day"
231                            } 
232                        ]
233                    }
234                ],
235                "default-spec-id": 1,
236                "last-partition-id": 1,
237                "properties": {
238                    "commit.retry.num-retries": "1"
239                },
240                "metadata-log": [
241                    {  
242                        "metadata-file": "s3://bucket/.../v1.json",  
243                        "timestamp-ms": 1515100
244                    }
245                ],
246                "sort-orders": [],
247                "default-sort-order-id": 0
248            }
249        "#;
250        let bytes = json_data.as_bytes();
251
252        let result = parse_metadata(location, bytes);
253        assert!(result.is_ok());
254        let metadata = result.unwrap();
255        if let TabularMetadata::Table(table_metadata) = metadata {
256            // Add specific checks for `table_metadata` fields if needed
257            assert_eq!(
258                table_metadata.table_uuid.to_string(),
259                "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"
260            );
261        } else {
262            panic!("Expected TabularMetadata::Table variant");
263        }
264    }
265
266    #[test]
267    fn test_parse_metadata_table_gzipped_json() {
268        let location = "/path/to/metadata/v1.gz.metadata.json";
269        let json_data = r#"
270            {
271                "format-version" : 2,
272                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
273                "location": "s3://b/wh/data.db/table",
274                "last-sequence-number" : 1,
275                "last-updated-ms": 1515100955770,
276                "last-column-id": 1,
277                "schemas": [
278                    {
279                        "schema-id" : 1,
280                        "type" : "struct",
281                        "fields" :[
282                            {
283                                "id": 1,
284                                "name": "struct_name",
285                                "required": true,
286                                "type": "fixed[1]"
287                            }
288                        ]
289                    }
290                ],
291                "current-schema-id" : 1,
292                "partition-specs": [
293                    {
294                        "spec-id": 1,
295                        "fields": [
296                            {  
297                                "source-id": 4,  
298                                "field-id": 1000,  
299                                "name": "ts_day",  
300                                "transform": "day"
301                            } 
302                        ]
303                    }
304                ],
305                "default-spec-id": 1,
306                "last-partition-id": 1,
307                "properties": {
308                    "commit.retry.num-retries": "1"
309                },
310                "metadata-log": [
311                    {  
312                        "metadata-file": "s3://bucket/.../v1.json",  
313                        "timestamp-ms": 1515100
314                    }
315                ],
316                "sort-orders": [],
317                "default-sort-order-id": 0
318            }
319        "#;
320
321        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
322        encoder.write_all(json_data.as_bytes()).unwrap();
323        let compressed_data = encoder.finish().unwrap();
324
325        let result = parse_metadata(location, &compressed_data);
326        assert!(result.is_ok());
327        let metadata = result.unwrap();
328        if let TabularMetadata::Table(table_metadata) = metadata {
329            assert_eq!(
330                table_metadata.table_uuid.to_string(),
331                "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"
332            );
333        } else {
334            panic!("Expected TabularMetadata::Table variant");
335        }
336    }
337
338    #[test]
339    fn test_parse_metadata_view_plain_json() {
340        let location = "/path/to/metadata/v1.metadata.json";
341        let json_data = r#"
342        {
343        "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
344        "format-version" : 1,
345        "location" : "s3://bucket/warehouse/default.db/event_agg",
346        "current-version-id" : 1,
347        "properties" : {
348            "comment" : "Daily event counts"
349        },
350        "versions" : [ {
351            "version-id" : 1,
352            "timestamp-ms" : 1573518431292,
353            "schema-id" : 1,
354            "default-catalog" : "prod",
355            "default-namespace" : [ "default" ],
356            "summary" : {
357            "operation" : "create",
358            "engine-name" : "Spark",
359            "engineVersion" : "3.3.2"
360            },
361            "representations" : [ {
362            "type" : "sql",
363            "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
364            "dialect" : "spark"
365            } ]
366        } ],
367        "schemas": [ {
368            "schema-id": 1,
369            "type" : "struct",
370            "fields" : [ {
371            "id" : 1,
372            "name" : "event_count",
373            "required" : false,
374            "type" : "int",
375            "doc" : "Count of events"
376            }, {
377            "id" : 2,
378            "name" : "event_date",
379            "required" : false,
380            "type" : "date"
381            } ]
382        } ],
383        "version-log" : [ {
384            "timestamp-ms" : 1573518431292,
385            "version-id" : 1
386        } ]
387        }
388        "#;
389        let bytes = json_data.as_bytes();
390
391        let result = parse_metadata(location, bytes);
392        assert!(result.is_ok());
393        let metadata = result.unwrap();
394        if let TabularMetadata::View(view_metadata) = metadata {
395            assert_eq!(
396                view_metadata.view_uuid.to_string(),
397                "fa6506c3-7681-40c8-86dc-e36561f83385"
398            );
399        } else {
400            panic!("Expected TabularMetadata::View variant");
401        }
402    }
403
404    #[test]
405    fn test_parse_metadata_view_gzipped_json() {
406        let location = "/path/to/metadata/v1.gz.metadata.json";
407        let json_data = r#"
408        {
409        "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
410        "format-version" : 1,
411        "location" : "s3://bucket/warehouse/default.db/event_agg",
412        "current-version-id" : 1,
413        "properties" : {
414            "comment" : "Daily event counts"
415        },
416        "versions" : [ {
417            "version-id" : 1,
418            "timestamp-ms" : 1573518431292,
419            "schema-id" : 1,
420            "default-catalog" : "prod",
421            "default-namespace" : [ "default" ],
422            "summary" : {
423            "operation" : "create",
424            "engine-name" : "Spark",
425            "engineVersion" : "3.3.2"
426            },
427            "representations" : [ {
428            "type" : "sql",
429            "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
430            "dialect" : "spark"
431            } ]
432        } ],
433        "schemas": [ {
434            "schema-id": 1,
435            "type" : "struct",
436            "fields" : [ {
437            "id" : 1,
438            "name" : "event_count",
439            "required" : false,
440            "type" : "int",
441            "doc" : "Count of events"
442            }, {
443            "id" : 2,
444            "name" : "event_date",
445            "required" : false,
446            "type" : "date"
447            } ]
448        } ],
449        "version-log" : [ {
450            "timestamp-ms" : 1573518431292,
451            "version-id" : 1
452        } ]
453        }
454        "#;
455
456        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
457        encoder.write_all(json_data.as_bytes()).unwrap();
458        let compressed_data = encoder.finish().unwrap();
459
460        let result = parse_metadata(location, &compressed_data);
461        assert!(result.is_ok());
462        let metadata = result.unwrap();
463        if let TabularMetadata::View(view_metadata) = metadata {
464            assert_eq!(
465                view_metadata.view_uuid.to_string(),
466                "fa6506c3-7681-40c8-86dc-e36561f83385"
467            );
468        } else {
469            panic!("Expected TabularMetadata::View variant");
470        }
471    }
472
473    #[test]
474    fn test_parse_metadata_invalid_json() {
475        let location = "/path/to/metadata/v1.metadata.json";
476        let invalid_json_data = r#"{"key": "value""#;
477        let bytes = invalid_json_data.as_bytes();
478
479        let result = parse_metadata(location, bytes);
480        assert!(result.is_err());
481    }
482
483    #[test]
484    fn test_parse_metadata_invalid_gzipped_data() {
485        let location = "/path/to/metadata/v1.gz.metadata.json";
486        let invalid_gzipped_data = b"not a valid gzip";
487
488        let result = parse_metadata(location, invalid_gzipped_data);
489        assert!(result.is_err());
490    }
491
492    #[test]
493    fn test_parse_metadata_empty_bytes() {
494        let location = "/path/to/metadata/v1.metadata.json";
495        let empty_bytes: &[u8] = &[];
496
497        let result = parse_metadata(location, empty_bytes);
498        assert!(result.is_err());
499    }
500
501    #[test]
502    fn test_parse_metadata_gzipped_empty_bytes() {
503        let location = "/path/to/metadata/v1.gz.metadata.json";
504        let empty_gzipped_bytes: &[u8] = &[];
505
506        let result = parse_metadata(location, empty_gzipped_bytes);
507        assert!(result.is_err());
508    }
509}