1use async_trait::async_trait;
4use iceberg_rust_spec::{
5 tabular::{TabularMetadata, TabularMetadataRef},
6 util::strip_prefix,
7};
8use object_store::{Attributes, ObjectStore, PutOptions, TagSet};
9
10use crate::error::Error;
11use flate2::read::GzDecoder;
12use lazy_static::lazy_static;
13use regex::Regex;
14use std::io::Read;
15
16#[async_trait]
18pub trait IcebergStore {
19 async fn get_metadata(&self, location: &str) -> Result<TabularMetadata, Error>;
21 async fn put_metadata(
23 &self,
24 location: &str,
25 metadata: TabularMetadataRef<'_>,
26 ) -> Result<(), Error>;
27 async fn put_version_hint(&self, location: &str) -> Result<(), Error>;
29}
30
31#[async_trait]
32impl<T: ObjectStore> IcebergStore for T {
33 async fn get_metadata(&self, location: &str) -> Result<TabularMetadata, Error> {
34 let bytes = self
35 .get(&strip_prefix(location).into())
36 .await?
37 .bytes()
38 .await?;
39
40 parse_metadata(location, &bytes)
41 }
42
43 async fn put_metadata(
44 &self,
45 location: &str,
46 metadata: TabularMetadataRef<'_>,
47 ) -> Result<(), Error> {
48 self.put(
49 &strip_prefix(location).into(),
50 serde_json::to_vec(&metadata)?.into(),
51 )
52 .await?;
53
54 Ok(())
55 }
56
57 async fn put_version_hint(&self, location: &str) -> Result<(), Error> {
58 self.put_opts(
59 &version_hint_path(&strip_prefix(location))
60 .ok_or(Error::InvalidFormat(format!(
61 "Path for version-hint for {location}"
62 )))?
63 .into(),
64 version_hint_content(location).into(),
65 PutOptions {
66 mode: object_store::PutMode::Overwrite,
67 tags: TagSet::default(),
68 attributes: Attributes::default(),
69 extensions: Default::default(),
70 },
71 )
72 .await?;
73
74 Ok(())
75 }
76}
77
78fn version_hint_path(original: &str) -> Option<String> {
79 Some(
80 std::path::Path::new(original)
81 .parent()?
82 .join("version-hint.text")
83 .to_str()?
84 .to_string(),
85 )
86}
87
88lazy_static! {
89 static ref SUPPORTED_METADATA_FILE_FORMATS: Vec<Regex> = vec![
90 Regex::new(
92 r"^(?<version>[0-9]{5}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}).(?:gz.)?metadata.json$"
93 )
94 .unwrap(),
95 Regex::new(r"^v(?<version>[0-9]+).metadata.json$").unwrap(),
97 ];
98}
99
100pub fn version_hint_content(original: &str) -> String {
103 original
104 .split("/")
105 .last()
106 .and_then(|filename| {
107 SUPPORTED_METADATA_FILE_FORMATS
108 .iter()
109 .filter_map(|regex| {
110 regex.captures(filename).and_then(|capture| {
111 capture
112 .name("version")
113 .and_then(|m| m.as_str().parse().ok())
114 })
115 })
116 .next()
117 })
118 .unwrap_or(original.to_string())
119}
120
121fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error> {
122 if location.ends_with(".gz.metadata.json") {
123 let mut decoder = GzDecoder::new(bytes);
124 let mut decompressed_data = Vec::new();
125 decoder
126 .read_to_end(&mut decompressed_data)
127 .map_err(|e| Error::Decompress(e.to_string()))?;
128 serde_json::from_slice(&decompressed_data).map_err(Error::from)
129 } else {
130 serde_json::from_slice(bytes).map_err(Error::from)
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use rstest::rstest;
138 use std::io::Write;
139
140 #[test]
141 fn test_version_hint_path_normal_case() {
142 let input = "/path/to/metadata/v1.metadata.json";
143 let expected = "/path/to/metadata/version-hint.text";
144 assert_eq!(version_hint_path(input), Some(expected.to_string()));
145 }
146
147 #[test]
148 fn test_version_hint_path_relative() {
149 let input = "path/to/metadata/v1.metadata.json";
150 let expected = "path/to/metadata/version-hint.text";
151 assert_eq!(version_hint_path(input), Some(expected.to_string()));
152 }
153
154 #[test]
155 fn test_version_hint_path_single_file() {
156 let input = "file.json";
157 let expected = "version-hint.text";
158 assert_eq!(version_hint_path(input), Some(expected.to_string()));
159 }
160
161 #[test]
162 fn test_version_hint_path_empty_string() {
163 let input = "";
164 assert_eq!(version_hint_path(input), None);
165 }
166
167 #[test]
168 fn test_version_hint_path_with_special_characters() {
169 let input = "/path/with spaces/and#special@chars/file.json";
170 let expected = "/path/with spaces/and#special@chars/version-hint.text";
171 assert_eq!(version_hint_path(input), Some(expected.to_string()));
172 }
173
174 #[test]
175 fn test_version_hint_path_with_multiple_extensions() {
176 let input = "/path/to/file.with.multiple.extensions.json";
177 let expected = "/path/to/version-hint.text";
178 assert_eq!(version_hint_path(input), Some(expected.to_string()));
179 }
180
181 #[rstest]
182 #[case::file_format("/path/to/metadata/v2.metadata.json", "2")]
183 #[case::metastore_format_no_gzip(
184 "/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.metadata.json",
185 "00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
186 )]
187 #[case::metastore_format_with_gzip(
188 "/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.gz.metadata.json",
189 "00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
190 )]
191 #[test]
192 fn test_version_hint_content(#[case] input: &str, #[case] expected: &str) {
193 assert_eq!(version_hint_content(input), expected);
194 }
195
196 #[test]
197 fn test_parse_metadata_table_plain_json() {
198 let location = "/path/to/metadata/v1.metadata.json";
199 let json_data = r#"
200 {
201 "format-version" : 2,
202 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
203 "location": "s3://b/wh/data.db/table",
204 "last-sequence-number" : 1,
205 "last-updated-ms": 1515100955770,
206 "last-column-id": 1,
207 "schemas": [
208 {
209 "schema-id" : 1,
210 "type" : "struct",
211 "fields" :[
212 {
213 "id": 1,
214 "name": "struct_name",
215 "required": true,
216 "type": "fixed[1]"
217 }
218 ]
219 }
220 ],
221 "current-schema-id" : 1,
222 "partition-specs": [
223 {
224 "spec-id": 1,
225 "fields": [
226 {
227 "source-id": 4,
228 "field-id": 1000,
229 "name": "ts_day",
230 "transform": "day"
231 }
232 ]
233 }
234 ],
235 "default-spec-id": 1,
236 "last-partition-id": 1,
237 "properties": {
238 "commit.retry.num-retries": "1"
239 },
240 "metadata-log": [
241 {
242 "metadata-file": "s3://bucket/.../v1.json",
243 "timestamp-ms": 1515100
244 }
245 ],
246 "sort-orders": [],
247 "default-sort-order-id": 0
248 }
249 "#;
250 let bytes = json_data.as_bytes();
251
252 let result = parse_metadata(location, bytes);
253 assert!(result.is_ok());
254 let metadata = result.unwrap();
255 if let TabularMetadata::Table(table_metadata) = metadata {
256 assert_eq!(
258 table_metadata.table_uuid.to_string(),
259 "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"
260 );
261 } else {
262 panic!("Expected TabularMetadata::Table variant");
263 }
264 }
265
266 #[test]
267 fn test_parse_metadata_table_gzipped_json() {
268 let location = "/path/to/metadata/v1.gz.metadata.json";
269 let json_data = r#"
270 {
271 "format-version" : 2,
272 "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
273 "location": "s3://b/wh/data.db/table",
274 "last-sequence-number" : 1,
275 "last-updated-ms": 1515100955770,
276 "last-column-id": 1,
277 "schemas": [
278 {
279 "schema-id" : 1,
280 "type" : "struct",
281 "fields" :[
282 {
283 "id": 1,
284 "name": "struct_name",
285 "required": true,
286 "type": "fixed[1]"
287 }
288 ]
289 }
290 ],
291 "current-schema-id" : 1,
292 "partition-specs": [
293 {
294 "spec-id": 1,
295 "fields": [
296 {
297 "source-id": 4,
298 "field-id": 1000,
299 "name": "ts_day",
300 "transform": "day"
301 }
302 ]
303 }
304 ],
305 "default-spec-id": 1,
306 "last-partition-id": 1,
307 "properties": {
308 "commit.retry.num-retries": "1"
309 },
310 "metadata-log": [
311 {
312 "metadata-file": "s3://bucket/.../v1.json",
313 "timestamp-ms": 1515100
314 }
315 ],
316 "sort-orders": [],
317 "default-sort-order-id": 0
318 }
319 "#;
320
321 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
322 encoder.write_all(json_data.as_bytes()).unwrap();
323 let compressed_data = encoder.finish().unwrap();
324
325 let result = parse_metadata(location, &compressed_data);
326 assert!(result.is_ok());
327 let metadata = result.unwrap();
328 if let TabularMetadata::Table(table_metadata) = metadata {
329 assert_eq!(
330 table_metadata.table_uuid.to_string(),
331 "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"
332 );
333 } else {
334 panic!("Expected TabularMetadata::Table variant");
335 }
336 }
337
338 #[test]
339 fn test_parse_metadata_view_plain_json() {
340 let location = "/path/to/metadata/v1.metadata.json";
341 let json_data = r#"
342 {
343 "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
344 "format-version" : 1,
345 "location" : "s3://bucket/warehouse/default.db/event_agg",
346 "current-version-id" : 1,
347 "properties" : {
348 "comment" : "Daily event counts"
349 },
350 "versions" : [ {
351 "version-id" : 1,
352 "timestamp-ms" : 1573518431292,
353 "schema-id" : 1,
354 "default-catalog" : "prod",
355 "default-namespace" : [ "default" ],
356 "summary" : {
357 "operation" : "create",
358 "engine-name" : "Spark",
359 "engineVersion" : "3.3.2"
360 },
361 "representations" : [ {
362 "type" : "sql",
363 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
364 "dialect" : "spark"
365 } ]
366 } ],
367 "schemas": [ {
368 "schema-id": 1,
369 "type" : "struct",
370 "fields" : [ {
371 "id" : 1,
372 "name" : "event_count",
373 "required" : false,
374 "type" : "int",
375 "doc" : "Count of events"
376 }, {
377 "id" : 2,
378 "name" : "event_date",
379 "required" : false,
380 "type" : "date"
381 } ]
382 } ],
383 "version-log" : [ {
384 "timestamp-ms" : 1573518431292,
385 "version-id" : 1
386 } ]
387 }
388 "#;
389 let bytes = json_data.as_bytes();
390
391 let result = parse_metadata(location, bytes);
392 assert!(result.is_ok());
393 let metadata = result.unwrap();
394 if let TabularMetadata::View(view_metadata) = metadata {
395 assert_eq!(
396 view_metadata.view_uuid.to_string(),
397 "fa6506c3-7681-40c8-86dc-e36561f83385"
398 );
399 } else {
400 panic!("Expected TabularMetadata::View variant");
401 }
402 }
403
404 #[test]
405 fn test_parse_metadata_view_gzipped_json() {
406 let location = "/path/to/metadata/v1.gz.metadata.json";
407 let json_data = r#"
408 {
409 "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
410 "format-version" : 1,
411 "location" : "s3://bucket/warehouse/default.db/event_agg",
412 "current-version-id" : 1,
413 "properties" : {
414 "comment" : "Daily event counts"
415 },
416 "versions" : [ {
417 "version-id" : 1,
418 "timestamp-ms" : 1573518431292,
419 "schema-id" : 1,
420 "default-catalog" : "prod",
421 "default-namespace" : [ "default" ],
422 "summary" : {
423 "operation" : "create",
424 "engine-name" : "Spark",
425 "engineVersion" : "3.3.2"
426 },
427 "representations" : [ {
428 "type" : "sql",
429 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
430 "dialect" : "spark"
431 } ]
432 } ],
433 "schemas": [ {
434 "schema-id": 1,
435 "type" : "struct",
436 "fields" : [ {
437 "id" : 1,
438 "name" : "event_count",
439 "required" : false,
440 "type" : "int",
441 "doc" : "Count of events"
442 }, {
443 "id" : 2,
444 "name" : "event_date",
445 "required" : false,
446 "type" : "date"
447 } ]
448 } ],
449 "version-log" : [ {
450 "timestamp-ms" : 1573518431292,
451 "version-id" : 1
452 } ]
453 }
454 "#;
455
456 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
457 encoder.write_all(json_data.as_bytes()).unwrap();
458 let compressed_data = encoder.finish().unwrap();
459
460 let result = parse_metadata(location, &compressed_data);
461 assert!(result.is_ok());
462 let metadata = result.unwrap();
463 if let TabularMetadata::View(view_metadata) = metadata {
464 assert_eq!(
465 view_metadata.view_uuid.to_string(),
466 "fa6506c3-7681-40c8-86dc-e36561f83385"
467 );
468 } else {
469 panic!("Expected TabularMetadata::View variant");
470 }
471 }
472
473 #[test]
474 fn test_parse_metadata_invalid_json() {
475 let location = "/path/to/metadata/v1.metadata.json";
476 let invalid_json_data = r#"{"key": "value""#;
477 let bytes = invalid_json_data.as_bytes();
478
479 let result = parse_metadata(location, bytes);
480 assert!(result.is_err());
481 }
482
483 #[test]
484 fn test_parse_metadata_invalid_gzipped_data() {
485 let location = "/path/to/metadata/v1.gz.metadata.json";
486 let invalid_gzipped_data = b"not a valid gzip";
487
488 let result = parse_metadata(location, invalid_gzipped_data);
489 assert!(result.is_err());
490 }
491
492 #[test]
493 fn test_parse_metadata_empty_bytes() {
494 let location = "/path/to/metadata/v1.metadata.json";
495 let empty_bytes: &[u8] = &[];
496
497 let result = parse_metadata(location, empty_bytes);
498 assert!(result.is_err());
499 }
500
501 #[test]
502 fn test_parse_metadata_gzipped_empty_bytes() {
503 let location = "/path/to/metadata/v1.gz.metadata.json";
504 let empty_gzipped_bytes: &[u8] = &[];
505
506 let result = parse_metadata(location, empty_gzipped_bytes);
507 assert!(result.is_err());
508 }
509}