use crate::storage::schema::types::Value;
use crate::storage::Store;
use crate::storage::{EntityData, EntityKind, RowData, UnifiedEntity};
use std::collections::HashMap;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::sync::Arc;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
#[derive(Debug, Clone)]
pub struct ParquetConfig {
pub columns: Option<Vec<String>>,
pub id_field: Option<String>,
pub embedding_field: Option<String>,
pub collection: String,
pub max_rows: Option<usize>,
pub batch_size: usize,
}
impl Default for ParquetConfig {
fn default() -> Self {
Self {
columns: None,
id_field: None,
embedding_field: None,
collection: "parquet_import".to_string(),
max_rows: None,
batch_size: 10000,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ParquetImportStats {
pub rows_imported: usize,
pub columns_read: usize,
pub duration_ms: u64,
pub file_size_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(u8)]
enum ParquetType {
Boolean = 0,
Int32 = 1,
Int64 = 2,
Int96 = 3, Float = 4,
Double = 5,
ByteArray = 6,
FixedLenByteArray = 7,
}
impl ParquetType {
fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::Boolean),
1 => Some(Self::Int32),
2 => Some(Self::Int64),
3 => Some(Self::Int96),
4 => Some(Self::Float),
5 => Some(Self::Double),
6 => Some(Self::ByteArray),
7 => Some(Self::FixedLenByteArray),
_ => None,
}
}
}
#[derive(Debug, Clone)]
struct ColumnMeta {
name: String,
ptype: ParquetType,
offset: u64,
size: u64,
num_values: usize,
}
pub struct ParquetReader {
config: ParquetConfig,
}
impl ParquetReader {
pub fn new(config: ParquetConfig) -> Self {
Self { config }
}
pub fn with_defaults() -> Self {
Self::new(ParquetConfig::default())
}
pub fn import_file<P: AsRef<Path>>(
&self,
path: P,
store: &mut Store,
) -> Result<ParquetImportStats, ParquetError> {
let start = std::time::Instant::now();
let mut file = File::open(path.as_ref()).map_err(|e| ParquetError::Io(e.to_string()))?;
let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
let mut magic_start = [0u8; 4];
file.read_exact(&mut magic_start)
.map_err(|e| ParquetError::Io(e.to_string()))?;
if magic_start != PARQUET_MAGIC {
return Err(ParquetError::Format(
"Invalid Parquet magic at start".to_string(),
));
}
file.seek(SeekFrom::End(-4))
.map_err(|e| ParquetError::Io(e.to_string()))?;
let mut magic_end = [0u8; 4];
file.read_exact(&mut magic_end)
.map_err(|e| ParquetError::Io(e.to_string()))?;
if magic_end != PARQUET_MAGIC {
return Err(ParquetError::Format(
"Invalid Parquet magic at end".to_string(),
));
}
file.seek(SeekFrom::End(-8))
.map_err(|e| ParquetError::Io(e.to_string()))?;
let mut footer_len_bytes = [0u8; 4];
file.read_exact(&mut footer_len_bytes)
.map_err(|e| ParquetError::Io(e.to_string()))?;
let footer_len = u32::from_le_bytes(footer_len_bytes) as u64;
let footer_start = file_size - 8 - footer_len;
file.seek(SeekFrom::Start(footer_start))
.map_err(|e| ParquetError::Io(e.to_string()))?;
let mut footer = vec![0u8; footer_len as usize];
file.read_exact(&mut footer)
.map_err(|e| ParquetError::Io(e.to_string()))?;
let (columns, num_rows) = self.parse_footer(&footer)?;
let columns_read = columns.len();
let mut rows_imported = 0;
let max_rows = self.config.max_rows.unwrap_or(num_rows);
let rows_to_read = max_rows.min(num_rows);
if rows_to_read > 0 && !columns.is_empty() {
let mut column_data: HashMap<String, Vec<Value>> = HashMap::new();
for col in &columns {
if let Some(ref wanted) = self.config.columns {
if !wanted.contains(&col.name) {
continue;
}
}
file.seek(SeekFrom::Start(col.offset))
.map_err(|e| ParquetError::Io(e.to_string()))?;
let mut data = vec![0u8; col.size as usize];
file.read_exact(&mut data)
.map_err(|e| ParquetError::Io(e.to_string()))?;
let values = self.decode_column(&data, col, rows_to_read)?;
column_data.insert(col.name.clone(), values);
}
for row_idx in 0..rows_to_read {
let mut named: HashMap<String, Value> = HashMap::new();
for (col_name, values) in &column_data {
if row_idx < values.len() {
named.insert(col_name.clone(), values[row_idx].clone());
}
}
let entity_id = store.next_entity_id();
let row_id = entity_id.0;
let row_data = RowData {
columns: Vec::new(),
named: Some(named),
schema: None,
};
let entity = UnifiedEntity::new(
entity_id,
EntityKind::TableRow {
table: Arc::from(self.config.collection.as_str()),
row_id,
},
EntityData::Row(row_data),
);
store
.insert(&self.config.collection, entity)
.map_err(|e| ParquetError::Import(format!("{:?}", e)))?;
rows_imported += 1;
}
}
Ok(ParquetImportStats {
rows_imported,
columns_read,
duration_ms: start.elapsed().as_millis() as u64,
file_size_bytes: file_size,
})
}
fn parse_footer(&self, data: &[u8]) -> Result<(Vec<ColumnMeta>, usize), ParquetError> {
let mut columns = Vec::new();
let mut num_rows = 0;
if !data.is_empty() {
let field_type = data[0] & 0x0F;
if field_type == 5 && data.len() >= 5 {
}
}
let mut i = 0;
while i + 10 < data.len() {
if data[i] == 0x16 || data[i] == 0x26 {
if i + 9 <= data.len() {
let val = read_i64_compact(&data[i + 1..]);
if val > 0 && val < 10_000_000_000 {
num_rows = val as usize;
}
}
}
i += 1;
}
if columns.is_empty() {
let mut text_start = None;
for (idx, &b) in data.iter().enumerate() {
if (0x20..=0x7E).contains(&b) {
if text_start.is_none() {
text_start = Some(idx);
}
} else if let Some(start) = text_start {
let len = idx - start;
if (2..=50).contains(&len) {
if let Ok(name) = std::str::from_utf8(&data[start..idx]) {
if !name.contains(' ')
&& name.chars().all(|c| c.is_alphanumeric() || c == '_')
{
columns.push(ColumnMeta {
name: name.to_string(),
ptype: ParquetType::ByteArray,
offset: 0,
size: 0,
num_values: num_rows,
});
}
}
}
text_start = None;
}
}
}
if num_rows == 0 {
num_rows = 1000; }
Ok((columns, num_rows))
}
fn decode_column(
&self,
data: &[u8],
col: &ColumnMeta,
max_values: usize,
) -> Result<Vec<Value>, ParquetError> {
let num_values = col.num_values.min(max_values);
let mut values = Vec::with_capacity(num_values);
match col.ptype {
ParquetType::Boolean => {
for i in 0..num_values {
let byte_idx = i / 8;
let bit_idx = i % 8;
if byte_idx < data.len() {
let bit = (data[byte_idx] >> bit_idx) & 1;
values.push(Value::Boolean(bit == 1));
}
}
}
ParquetType::Int32 => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 4 <= data.len() {
let val = i32::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
]);
values.push(Value::Integer(val as i64));
pos += 4;
}
}
}
ParquetType::Int64 => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 8 <= data.len() {
let val = i64::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
data[pos + 4],
data[pos + 5],
data[pos + 6],
data[pos + 7],
]);
values.push(Value::Integer(val));
pos += 8;
}
}
}
ParquetType::Float => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 4 <= data.len() {
let val = f32::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
]);
values.push(Value::Float(val as f64));
pos += 4;
}
}
}
ParquetType::Double => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 8 <= data.len() {
let val = f64::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
data[pos + 4],
data[pos + 5],
data[pos + 6],
data[pos + 7],
]);
values.push(Value::Float(val));
pos += 8;
}
}
}
ParquetType::ByteArray | ParquetType::FixedLenByteArray => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 4 <= data.len() {
let len = u32::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
]) as usize;
pos += 4;
if pos + len <= data.len() {
if let Ok(s) = std::str::from_utf8(&data[pos..pos + len]) {
values.push(Value::text(s.to_string()));
} else {
values.push(Value::Blob(data[pos..pos + len].to_vec()));
}
pos += len;
}
}
}
}
ParquetType::Int96 => {
let mut pos = 0;
for _ in 0..num_values {
if pos + 12 <= data.len() {
let nanos = i64::from_le_bytes([
data[pos],
data[pos + 1],
data[pos + 2],
data[pos + 3],
data[pos + 4],
data[pos + 5],
data[pos + 6],
data[pos + 7],
]);
values.push(Value::Integer(nanos));
pos += 12;
}
}
}
}
Ok(values)
}
}
fn read_i64_compact(data: &[u8]) -> i64 {
if data.len() >= 8 {
i64::from_le_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
])
} else {
0
}
}
#[derive(Debug)]
pub enum ParquetError {
Io(String),
Format(String),
Import(String),
}
impl std::fmt::Display for ParquetError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParquetError::Io(s) => write!(f, "I/O error: {}", s),
ParquetError::Format(s) => write!(f, "Format error: {}", s),
ParquetError::Import(s) => write!(f, "Import error: {}", s),
}
}
}
impl std::error::Error for ParquetError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parquet_magic() {
assert_eq!(PARQUET_MAGIC, [b'P', b'A', b'R', b'1']);
}
#[test]
fn test_parquet_type_from_u8() {
assert_eq!(ParquetType::from_u8(0), Some(ParquetType::Boolean));
assert_eq!(ParquetType::from_u8(1), Some(ParquetType::Int32));
assert_eq!(ParquetType::from_u8(5), Some(ParquetType::Double));
assert_eq!(ParquetType::from_u8(99), None);
}
#[test]
fn test_decode_int32() {
let reader = ParquetReader::with_defaults();
let data = vec![
0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, ];
let col = ColumnMeta {
name: "test".to_string(),
ptype: ParquetType::Int32,
offset: 0,
size: 12,
num_values: 3,
};
let values = reader.decode_column(&data, &col, 3).unwrap();
assert_eq!(values.len(), 3);
assert_eq!(values[0], Value::Integer(1));
assert_eq!(values[1], Value::Integer(2));
assert_eq!(values[2], Value::Integer(-1));
}
#[test]
fn test_decode_float() {
let reader = ParquetReader::with_defaults();
let val: f32 = 2.5;
let data = val.to_le_bytes().to_vec();
let col = ColumnMeta {
name: "test".to_string(),
ptype: ParquetType::Float,
offset: 0,
size: 4,
num_values: 1,
};
let values = reader.decode_column(&data, &col, 1).unwrap();
assert_eq!(values.len(), 1);
if let Value::Float(f) = values[0] {
assert!((f - 2.5).abs() < 0.001);
} else {
panic!("Expected float");
}
}
#[test]
fn test_config_default() {
let config = ParquetConfig::default();
assert_eq!(config.batch_size, 10000);
assert!(config.columns.is_none());
}
}