use crate::parse::{Pageviews, ParseError};
use arrow2::array::TryPush;
use arrow2::array::{
Array, MutableBooleanArray, MutableDictionaryArray, MutablePrimitiveArray, MutableUtf8Array,
};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::io::parquet::write::*;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
fn create_schema() -> Schema {
Schema::from(vec![
Field::new("domain_code", DataType::Utf8, false),
Field::new("page_title", DataType::Utf8, false),
Field::new("views", DataType::UInt32, false),
Field::new("language", DataType::Utf8, false),
Field::new("domain", DataType::Utf8, true),
Field::new("mobile", DataType::Boolean, false),
])
}
struct ChunkIterator<I: Iterator<Item = Result<Pageviews, ParseError>>> {
iter: I,
batch_size: usize,
}
impl<I: Iterator<Item = Result<Pageviews, ParseError>>> Iterator for ChunkIterator<I> {
type Item = Result<Chunk<Arc<dyn Array>>, arrow2::error::Error>;
fn next(&mut self) -> Option<Self::Item> {
let mut domain_code_builder: MutableDictionaryArray<i32, MutableUtf8Array<i32>> =
MutableDictionaryArray::new();
let mut page_title_builder = MutableUtf8Array::<i32>::new();
let mut views_builder = MutablePrimitiveArray::<u32>::new();
let mut language_builder: MutableDictionaryArray<i32, MutableUtf8Array<i32>> =
MutableDictionaryArray::new();
let mut domain_builder: MutableDictionaryArray<i32, MutableUtf8Array<i32>> =
MutableDictionaryArray::new();
let mut mobile_builder = MutableBooleanArray::new();
let mut count = 0;
while count < self.batch_size {
match self.iter.next() {
Some(Ok(row)) => {
if domain_code_builder
.try_push(Some(&row.domain_code))
.is_err()
|| language_builder
.try_push(Some(&row.parsed_domain_code.language))
.is_err()
|| domain_builder
.try_push(row.parsed_domain_code.domain)
.is_err()
{
return None;
}
page_title_builder.push(Some(&row.page_title));
views_builder.push(Some(row.views));
mobile_builder.push(Some(row.parsed_domain_code.mobile));
count += 1;
}
Some(Err(_)) => {
continue;
}
None => break,
}
}
if count == 0 {
None
} else {
Some(Ok(Chunk::new(vec![
domain_code_builder.into_arc(),
page_title_builder.into_arc(),
views_builder.into_arc(),
language_builder.into_arc(),
domain_builder.into_arc(),
mobile_builder.into_arc(),
])))
}
}
}
pub fn arrow_chunks_from_structs(
iterator: impl Iterator<Item = Result<Pageviews, ParseError>>,
batch_size: Option<usize>,
) -> impl Iterator<Item = Result<Chunk<Arc<dyn Array>>, arrow2::error::Error>> {
let batch_size = batch_size.unwrap_or(122_880);
ChunkIterator {
iter: iterator,
batch_size,
}
}
pub fn parquet_from_arrow<I>(path: &Path, chunks: I) -> arrow2::error::Result<()>
where
I: Iterator<Item = Result<Chunk<Arc<dyn Array>>, arrow2::error::Error>>,
{
let file = File::create(path)?;
let schema = create_schema();
let options = WriteOptions {
write_statistics: false,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
data_pagesize_limit: None,
};
let encodings = vec![
vec![Encoding::RleDictionary], vec![Encoding::Plain], vec![Encoding::Plain], vec![Encoding::RleDictionary], vec![Encoding::RleDictionary], vec![Encoding::Plain], ];
let row_groups = RowGroupIterator::try_new(chunks, &schema, options, encodings)?;
let mut writer = FileWriter::try_new(file, schema, options)?;
for group in row_groups {
writer.write(group?)?;
}
writer.end(None)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parse::DomainCode;
use crate::parse::ParseError;
use arrow2::array::{BooleanArray, DictionaryArray, UInt32Array, Utf8Array};
fn make_pageviews() -> Vec<Result<Pageviews, ParseError>> {
let pv1 = Pageviews {
domain_code: "en".to_string(),
page_title: "Main_Page".to_string(),
views: 1000,
parsed_domain_code: DomainCode {
language: "en".to_string(),
domain: Some("wikipedia.org"),
mobile: false,
},
};
let pv2 = Pageviews {
domain_code: "de.m".to_string(),
page_title: "Startseite".to_string(),
views: 500,
parsed_domain_code: DomainCode {
language: "de".to_string(),
domain: Some("wikipedia.de"),
mobile: true,
},
};
vec![Ok(pv1), Ok(pv2)]
}
fn dict_lookup(dict: &DictionaryArray<i32>, idx: usize) -> &str {
let key: usize = dict.keys().value(idx) as usize;
let values = dict
.values()
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap();
values.value(key)
}
#[test]
fn test_arrow_from_structs() {
let pageviews = make_pageviews().into_iter();
let chunk = arrow_chunks_from_structs(pageviews, None)
.next()
.unwrap()
.unwrap();
assert_eq!(chunk.arrays().len(), 6);
assert_eq!(chunk.len(), 2);
let domain_code_array = chunk.arrays()[0]
.as_any()
.downcast_ref::<DictionaryArray<i32>>()
.unwrap();
assert_eq!(dict_lookup(domain_code_array, 0), "en");
assert_eq!(dict_lookup(domain_code_array, 1), "de.m");
let page_title_array = chunk.arrays()[1]
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap();
assert_eq!(page_title_array.value(0), "Main_Page");
assert_eq!(page_title_array.value(1), "Startseite");
let views_array = chunk.arrays()[2]
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(views_array.value(0), 1000);
assert_eq!(views_array.value(1), 500);
let language_array = chunk.arrays()[3]
.as_any()
.downcast_ref::<DictionaryArray<i32>>()
.unwrap();
assert_eq!(dict_lookup(language_array, 0), "en");
assert_eq!(dict_lookup(language_array, 1), "de");
let domain_array = chunk.arrays()[4]
.as_any()
.downcast_ref::<DictionaryArray<i32>>()
.unwrap();
assert_eq!(dict_lookup(domain_array, 0), "wikipedia.org");
assert_eq!(dict_lookup(domain_array, 1), "wikipedia.de");
let mobile_array = chunk.arrays()[5]
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(!mobile_array.value(0));
assert!(mobile_array.value(1));
}
}