amadeus-parquet 0.4.3

An Apache Parquet implementation in Rust.
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#![cfg(not(miri))]
#![allow(clippy::type_complexity)]

use std::{collections::HashMap, env, fs, path::PathBuf, str::FromStr};

use amadeus_parquet::internal;
use internal::{
	errors::ParquetError, file::reader::{FileReader, RowGroupReader, SerializedFileReader}, record::{types::Map, Record}
};

#[allow(dead_code)]
#[derive(Data)]
struct Abc {
	a: String,
}

#[allow(dead_code)]
#[derive(Data)]
struct Def {
	#[parquet(name = "!@£$%^&*(")]
	a: String,
}

// #[derive(Data)]
// struct Ghi {
// 	#[parquet(name = 123)]
//     a: String,
// }

#[allow(dead_code)]
#[derive(Data)]
struct Jkl<M> {
	a: M,
}

#[allow(dead_code)]
#[derive(Data)]
struct Mno {}

macro_rules! list {
	( $( $e:expr ), * ) => {
		{
			#[allow(unused_mut)]
			let mut result = Vec::new();
			$(
				result.push($e);
			)*
			result
		}
	}
}

macro_rules! map {
	( $( ($k:expr, $v:expr) ), * ) => {
		{
			#[allow(unused_mut)]
			let mut result = HashMap::new();
			$(
				result.insert($k, $v);
			)*
			Map::from(result)
		}
	}
}

#[test]
fn test_file_reader_rows_nonnullable_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct RowDerived {
		#[parquet(name = "ID")]
		id: i64,
		#[parquet(name = "Int_Array")]
		int_array: Vec<i32>,
		int_array_array: Vec<Vec<i32>>,
		#[parquet(name = "Int_Map")]
		int_map: Map<String, i32>,
		int_map_array: Vec<Map<String, i32>>,
		#[parquet(name = "nested_Struct")]
		nested_struct: RowDerivedInner,
	}

	#[derive(PartialEq, Record, Debug)]
	struct RowDerivedInner {
		a: i32,
		#[parquet(name = "B")]
		b: Vec<i32>,
		c: RowDerivedInnerInner,
		#[parquet(name = "G")]
		g: Map<String, ((Vec<f64>,),)>,
	}

	#[derive(PartialEq, Record, Debug)]
	struct RowDerivedInnerInner {
		#[parquet(name = "D")]
		d: Vec<Vec<RowDerivedInnerInnerInner>>,
	}

	#[derive(PartialEq, Record, Debug)]
	struct RowDerivedInnerInnerInner {
		e: i32,
		f: String,
	}

	let rows = test_file_reader_rows::<RowDerived>("nonnullable.impala.parquet", None).unwrap();

	let expected_rows: Vec<RowDerived> = vec![RowDerived {
		id: 8,
		int_array: list![-1],
		int_array_array: list![list![-1, -2], list![]],
		int_map: map![("k1".to_string(), -1)],
		int_map_array: list![map![], map![("k1".to_string(), 1i32)], map![], map![]],
		nested_struct: RowDerivedInner {
			a: -1,
			b: list![-1],
			c: RowDerivedInnerInner {
				d: list![list![RowDerivedInnerInnerInner {
					e: -1,
					f: "nonnullable".to_string()
				}]],
			},
			g: map![],
		},
	}];

	assert_eq!(rows, expected_rows);
}

#[test]
fn test_file_reader_rows_projection_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct SparkSchema {
		c: f64,
		b: i32,
	}

	let rows = test_file_reader_rows::<SparkSchema>("nested_maps.snappy.parquet", None).unwrap();

	let expected_rows = vec![
		SparkSchema { c: 1.0, b: 1 },
		SparkSchema { c: 1.0, b: 1 },
		SparkSchema { c: 1.0, b: 1 },
		SparkSchema { c: 1.0, b: 1 },
		SparkSchema { c: 1.0, b: 1 },
		SparkSchema { c: 1.0, b: 1 },
	];

	assert_eq!(rows, expected_rows);
}

#[test]
fn test_file_reader_rows_projection_map_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct SparkSchema {
		a: Option<Map<String, Option<Map<i32, bool>>>>,
	}

	let rows = test_file_reader_rows::<SparkSchema>("nested_maps.snappy.parquet", None).unwrap();

	let expected_rows = vec![
		SparkSchema {
			a: Some(map![("a".to_string(), Some(map![(1, true), (2, false)]))]),
		},
		SparkSchema {
			a: Some(map![("b".to_string(), Some(map![(1, true)]))]),
		},
		SparkSchema {
			a: Some(map![("c".to_string(), None)]),
		},
		SparkSchema {
			a: Some(map![("d".to_string(), Some(map![]))]),
		},
		SparkSchema {
			a: Some(map![("e".to_string(), Some(map![(1, true)]))]),
		},
		SparkSchema {
			a: Some(map![(
				"f".to_string(),
				Some(map![(3, true), (4, false), (5, true)])
			)]),
		},
	];

	assert_eq!(rows, expected_rows);
}

#[test]
fn test_file_reader_rows_projection_list_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct SparkSchema {
		a: Option<Vec<Option<Vec<Option<Vec<Option<String>>>>>>>,
	}

	let rows = test_file_reader_rows::<SparkSchema>("nested_lists.snappy.parquet", None).unwrap();

	let expected_rows = vec![
		SparkSchema {
			a: Some(list![
				Some(list![
					Some(list![Some("a".to_string()), Some("b".to_string())]),
					Some(list![Some("c".to_string())])
				]),
				Some(list![None, Some(list![Some("d".to_string())])])
			]),
		},
		SparkSchema {
			a: Some(list![
				Some(list![
					Some(list![Some("a".to_string()), Some("b".to_string())]),
					Some(list![Some("c".to_string()), Some("d".to_string())])
				]),
				Some(list![None, Some(list![Some("e".to_string())])])
			]),
		},
		SparkSchema {
			a: Some(list![
				Some(list![
					Some(list![Some("a".to_string()), Some("b".to_string())]),
					Some(list![Some("c".to_string()), Some("d".to_string())]),
					Some(list![Some("e".to_string())])
				]),
				Some(list![None, Some(list![Some("f".to_string())])])
			]),
		},
	];

	assert_eq!(rows, expected_rows);
}

#[test]
fn test_file_reader_rows_invalid_projection_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct SparkSchema {
		key: i32,
		value: bool,
	}

	let res = test_file_reader_rows::<SparkSchema>("nested_maps.snappy.parquet", None);

	assert_eq!(
		res.unwrap_err(),
		ParquetError::General("Types don't match schema.\nSchema is:\nmessage spark_schema {\n    OPTIONAL group a (MAP) {\n        REPEATED group key_value {\n            REQUIRED byte_array key (UTF8);\n            OPTIONAL group value (MAP) {\n                REPEATED group key_value {\n                    REQUIRED int32 key (INT_32);\n                    REQUIRED boolean value;\n                }\n            }\n        }\n    }\n    REQUIRED int32 b (INT_32);\n    REQUIRED double c;\n}\nBut types require:\nmessage <name> {\n    REQUIRED int32 key (INT_32);\n    REQUIRED boolean value;\n}\nError: Parquet error: Struct \"SparkSchema\" has field \"key\" not in the schema".to_string())
	);
}

#[test]
fn test_row_group_rows_invalid_projection_derived() {
	#[derive(PartialEq, Record, Debug)]
	struct SparkSchema {
		key: i32,
		value: bool,
	}

	let res = test_row_group_rows::<SparkSchema>("nested_maps.snappy.parquet", None);

	assert_eq!(
		res.unwrap_err(),
		ParquetError::General("Types don't match schema.\nSchema is:\nmessage spark_schema {\n    OPTIONAL group a (MAP) {\n        REPEATED group key_value {\n            REQUIRED byte_array key (UTF8);\n            OPTIONAL group value (MAP) {\n                REPEATED group key_value {\n                    REQUIRED int32 key (INT_32);\n                    REQUIRED boolean value;\n                }\n            }\n        }\n    }\n    REQUIRED int32 b (INT_32);\n    REQUIRED double c;\n}\nBut types require:\nmessage <name> {\n    REQUIRED int32 key (INT_32);\n    REQUIRED boolean value;\n}\nError: Parquet error: Struct \"SparkSchema\" has field \"key\" not in the schema".to_string())
	);
}

fn test_file_reader_rows<T>(file_name: &str, schema: Option<()>) -> Result<Vec<T>, ParquetError>
where
	T: Record,
{
	assert!(schema.is_none());
	let file = get_test_file(file_name);
	let file_reader: SerializedFileReader<_> = SerializedFileReader::new(file)?;
	let iter = file_reader.get_row_iter(None)?;
	Ok(iter.map(Result::unwrap).collect())
}

fn test_row_group_rows<T>(file_name: &str, schema: Option<()>) -> Result<Vec<T>, ParquetError>
where
	T: Record,
{
	assert!(schema.is_none());
	let file = get_test_file(file_name);
	let file_reader: SerializedFileReader<_> = SerializedFileReader::new(file)?;
	// Check the first row group only, because files will contain only single row group
	let row_group_reader = file_reader.get_row_group(0).unwrap();
	let iter = row_group_reader.get_row_iter(None)?;
	Ok(iter.map(Result::unwrap).collect())
}

fn get_test_file(file_name: &str) -> fs::File {
	let file = fs::File::open(get_test_path(file_name).as_path());
	if file.is_err() {
		panic!("Test file {} not found", file_name)
	}
	file.unwrap()
}

fn get_test_path(file_name: &str) -> PathBuf {
	let mut pathbuf = PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()); // https://github.com/rust-lang/miri/issues/1514 env::current_dir().unwrap();
	pathbuf.push(PathBuf::from_str("../amadeus-testing/parquet").unwrap());
	pathbuf.push(file_name);
	pathbuf
}