1use crate::storage::schema::types::Value;
23use crate::storage::Store;
24use crate::storage::{EntityData, EntityKind, RowData, UnifiedEntity};
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::{Read, Seek, SeekFrom};
28use std::path::Path;
29use std::sync::Arc;
30
31const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
33
34#[derive(Debug, Clone)]
36pub struct ParquetConfig {
37 pub columns: Option<Vec<String>>,
39 pub id_field: Option<String>,
41 pub embedding_field: Option<String>,
43 pub collection: String,
45 pub max_rows: Option<usize>,
47 pub batch_size: usize,
49}
50
51impl Default for ParquetConfig {
52 fn default() -> Self {
53 Self {
54 columns: None,
55 id_field: None,
56 embedding_field: None,
57 collection: "parquet_import".to_string(),
58 max_rows: None,
59 batch_size: 10000,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct ParquetImportStats {
67 pub rows_imported: usize,
68 pub columns_read: usize,
69 pub duration_ms: u64,
70 pub file_size_bytes: u64,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq)]
75#[repr(u8)]
76enum ParquetType {
77 Boolean = 0,
78 Int32 = 1,
79 Int64 = 2,
80 Int96 = 3, Float = 4,
82 Double = 5,
83 ByteArray = 6,
84 FixedLenByteArray = 7,
85}
86
87impl ParquetType {
88 fn from_u8(v: u8) -> Option<Self> {
89 match v {
90 0 => Some(Self::Boolean),
91 1 => Some(Self::Int32),
92 2 => Some(Self::Int64),
93 3 => Some(Self::Int96),
94 4 => Some(Self::Float),
95 5 => Some(Self::Double),
96 6 => Some(Self::ByteArray),
97 7 => Some(Self::FixedLenByteArray),
98 _ => None,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105struct ColumnMeta {
106 name: String,
107 ptype: ParquetType,
108 offset: u64,
109 size: u64,
110 num_values: usize,
111}
112
113pub struct ParquetReader {
115 config: ParquetConfig,
116}
117
118impl ParquetReader {
119 pub fn new(config: ParquetConfig) -> Self {
121 Self { config }
122 }
123
124 pub fn with_defaults() -> Self {
126 Self::new(ParquetConfig::default())
127 }
128
129 pub fn import_file<P: AsRef<Path>>(
131 &self,
132 path: P,
133 store: &mut Store,
134 ) -> Result<ParquetImportStats, ParquetError> {
135 let start = std::time::Instant::now();
136 let mut file = File::open(path.as_ref()).map_err(|e| ParquetError::Io(e.to_string()))?;
137
138 let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
139
140 let mut magic_start = [0u8; 4];
142 file.read_exact(&mut magic_start)
143 .map_err(|e| ParquetError::Io(e.to_string()))?;
144 if magic_start != PARQUET_MAGIC {
145 return Err(ParquetError::Format(
146 "Invalid Parquet magic at start".to_string(),
147 ));
148 }
149
150 file.seek(SeekFrom::End(-4))
152 .map_err(|e| ParquetError::Io(e.to_string()))?;
153 let mut magic_end = [0u8; 4];
154 file.read_exact(&mut magic_end)
155 .map_err(|e| ParquetError::Io(e.to_string()))?;
156 if magic_end != PARQUET_MAGIC {
157 return Err(ParquetError::Format(
158 "Invalid Parquet magic at end".to_string(),
159 ));
160 }
161
162 file.seek(SeekFrom::End(-8))
164 .map_err(|e| ParquetError::Io(e.to_string()))?;
165 let mut footer_len_bytes = [0u8; 4];
166 file.read_exact(&mut footer_len_bytes)
167 .map_err(|e| ParquetError::Io(e.to_string()))?;
168 let footer_len = u32::from_le_bytes(footer_len_bytes) as u64;
169
170 let footer_start = file_size - 8 - footer_len;
172 file.seek(SeekFrom::Start(footer_start))
173 .map_err(|e| ParquetError::Io(e.to_string()))?;
174
175 let mut footer = vec![0u8; footer_len as usize];
176 file.read_exact(&mut footer)
177 .map_err(|e| ParquetError::Io(e.to_string()))?;
178
179 let (columns, num_rows) = self.parse_footer(&footer)?;
181
182 let columns_read = columns.len();
183 let mut rows_imported = 0;
184
185 let max_rows = self.config.max_rows.unwrap_or(num_rows);
187 let rows_to_read = max_rows.min(num_rows);
188
189 if rows_to_read > 0 && !columns.is_empty() {
190 let mut column_data: HashMap<String, Vec<Value>> = HashMap::new();
192
193 for col in &columns {
194 if let Some(ref wanted) = self.config.columns {
195 if !wanted.contains(&col.name) {
196 continue;
197 }
198 }
199
200 file.seek(SeekFrom::Start(col.offset))
201 .map_err(|e| ParquetError::Io(e.to_string()))?;
202
203 let mut data = vec![0u8; col.size as usize];
204 file.read_exact(&mut data)
205 .map_err(|e| ParquetError::Io(e.to_string()))?;
206
207 let values = self.decode_column(&data, col, rows_to_read)?;
208 column_data.insert(col.name.clone(), values);
209 }
210
211 for row_idx in 0..rows_to_read {
213 let mut named: HashMap<String, Value> = HashMap::new();
214
215 for (col_name, values) in &column_data {
216 if row_idx < values.len() {
217 named.insert(col_name.clone(), values[row_idx].clone());
218 }
219 }
220
221 let entity_id = store.next_entity_id();
222 let row_id = entity_id.0;
223
224 let row_data = RowData {
225 columns: Vec::new(),
226 named: Some(named),
227 schema: None,
228 };
229
230 let entity = UnifiedEntity::new(
231 entity_id,
232 EntityKind::TableRow {
233 table: Arc::from(self.config.collection.as_str()),
234 row_id,
235 },
236 EntityData::Row(row_data),
237 );
238
239 store
240 .insert(&self.config.collection, entity)
241 .map_err(|e| ParquetError::Import(format!("{:?}", e)))?;
242
243 rows_imported += 1;
244 }
245 }
246
247 Ok(ParquetImportStats {
248 rows_imported,
249 columns_read,
250 duration_ms: start.elapsed().as_millis() as u64,
251 file_size_bytes: file_size,
252 })
253 }
254
255 fn parse_footer(&self, data: &[u8]) -> Result<(Vec<ColumnMeta>, usize), ParquetError> {
257 let mut columns = Vec::new();
261 let mut num_rows = 0;
262 if !data.is_empty() {
264 let field_type = data[0] & 0x0F;
265 if field_type == 5 && data.len() >= 5 {
266 }
268 }
269
270 let mut i = 0;
275 while i + 10 < data.len() {
276 if data[i] == 0x16 || data[i] == 0x26 {
281 if i + 9 <= data.len() {
283 let val = read_i64_compact(&data[i + 1..]);
284 if val > 0 && val < 10_000_000_000 {
285 num_rows = val as usize;
286 }
287 }
288 }
289
290 i += 1;
291 }
292
293 if columns.is_empty() {
295 let mut text_start = None;
299 for (idx, &b) in data.iter().enumerate() {
300 if (0x20..=0x7E).contains(&b) {
301 if text_start.is_none() {
303 text_start = Some(idx);
304 }
305 } else if let Some(start) = text_start {
306 let len = idx - start;
307 if (2..=50).contains(&len) {
308 if let Ok(name) = std::str::from_utf8(&data[start..idx]) {
310 if !name.contains(' ')
311 && name.chars().all(|c| c.is_alphanumeric() || c == '_')
312 {
313 columns.push(ColumnMeta {
315 name: name.to_string(),
316 ptype: ParquetType::ByteArray,
317 offset: 0,
318 size: 0,
319 num_values: num_rows,
320 });
321 }
322 }
323 }
324 text_start = None;
325 }
326 }
327 }
328
329 if num_rows == 0 {
331 num_rows = 1000; }
333
334 Ok((columns, num_rows))
335 }
336
337 fn decode_column(
339 &self,
340 data: &[u8],
341 col: &ColumnMeta,
342 max_values: usize,
343 ) -> Result<Vec<Value>, ParquetError> {
344 let num_values = col.num_values.min(max_values);
345 let mut values = Vec::with_capacity(num_values);
346
347 match col.ptype {
348 ParquetType::Boolean => {
349 for i in 0..num_values {
350 let byte_idx = i / 8;
351 let bit_idx = i % 8;
352 if byte_idx < data.len() {
353 let bit = (data[byte_idx] >> bit_idx) & 1;
354 values.push(Value::Boolean(bit == 1));
355 }
356 }
357 }
358 ParquetType::Int32 => {
359 let mut pos = 0;
360 for _ in 0..num_values {
361 if pos + 4 <= data.len() {
362 let val = i32::from_le_bytes([
363 data[pos],
364 data[pos + 1],
365 data[pos + 2],
366 data[pos + 3],
367 ]);
368 values.push(Value::Integer(val as i64));
369 pos += 4;
370 }
371 }
372 }
373 ParquetType::Int64 => {
374 let mut pos = 0;
375 for _ in 0..num_values {
376 if pos + 8 <= data.len() {
377 let val = i64::from_le_bytes([
378 data[pos],
379 data[pos + 1],
380 data[pos + 2],
381 data[pos + 3],
382 data[pos + 4],
383 data[pos + 5],
384 data[pos + 6],
385 data[pos + 7],
386 ]);
387 values.push(Value::Integer(val));
388 pos += 8;
389 }
390 }
391 }
392 ParquetType::Float => {
393 let mut pos = 0;
394 for _ in 0..num_values {
395 if pos + 4 <= data.len() {
396 let val = f32::from_le_bytes([
397 data[pos],
398 data[pos + 1],
399 data[pos + 2],
400 data[pos + 3],
401 ]);
402 values.push(Value::Float(val as f64));
403 pos += 4;
404 }
405 }
406 }
407 ParquetType::Double => {
408 let mut pos = 0;
409 for _ in 0..num_values {
410 if pos + 8 <= data.len() {
411 let val = f64::from_le_bytes([
412 data[pos],
413 data[pos + 1],
414 data[pos + 2],
415 data[pos + 3],
416 data[pos + 4],
417 data[pos + 5],
418 data[pos + 6],
419 data[pos + 7],
420 ]);
421 values.push(Value::Float(val));
422 pos += 8;
423 }
424 }
425 }
426 ParquetType::ByteArray | ParquetType::FixedLenByteArray => {
427 let mut pos = 0;
429 for _ in 0..num_values {
430 if pos + 4 <= data.len() {
431 let len = u32::from_le_bytes([
432 data[pos],
433 data[pos + 1],
434 data[pos + 2],
435 data[pos + 3],
436 ]) as usize;
437 pos += 4;
438 if pos + len <= data.len() {
439 if let Ok(s) = std::str::from_utf8(&data[pos..pos + len]) {
440 values.push(Value::text(s.to_string()));
441 } else {
442 values.push(Value::Blob(data[pos..pos + len].to_vec()));
443 }
444 pos += len;
445 }
446 }
447 }
448 }
449 ParquetType::Int96 => {
450 let mut pos = 0;
452 for _ in 0..num_values {
453 if pos + 12 <= data.len() {
454 let nanos = i64::from_le_bytes([
456 data[pos],
457 data[pos + 1],
458 data[pos + 2],
459 data[pos + 3],
460 data[pos + 4],
461 data[pos + 5],
462 data[pos + 6],
463 data[pos + 7],
464 ]);
465 values.push(Value::Integer(nanos));
466 pos += 12;
467 }
468 }
469 }
470 }
471
472 Ok(values)
473 }
474}
475
476fn read_i64_compact(data: &[u8]) -> i64 {
478 if data.len() >= 8 {
479 i64::from_le_bytes([
480 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
481 ])
482 } else {
483 0
484 }
485}
486
487#[derive(Debug)]
489pub enum ParquetError {
490 Io(String),
491 Format(String),
492 Import(String),
493}
494
495impl std::fmt::Display for ParquetError {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 match self {
498 ParquetError::Io(s) => write!(f, "I/O error: {}", s),
499 ParquetError::Format(s) => write!(f, "Format error: {}", s),
500 ParquetError::Import(s) => write!(f, "Import error: {}", s),
501 }
502 }
503}
504
505impl std::error::Error for ParquetError {}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_parquet_magic() {
513 assert_eq!(PARQUET_MAGIC, [b'P', b'A', b'R', b'1']);
514 }
515
516 #[test]
517 fn test_parquet_type_from_u8() {
518 assert_eq!(ParquetType::from_u8(0), Some(ParquetType::Boolean));
519 assert_eq!(ParquetType::from_u8(1), Some(ParquetType::Int32));
520 assert_eq!(ParquetType::from_u8(5), Some(ParquetType::Double));
521 assert_eq!(ParquetType::from_u8(99), None);
522 }
523
524 #[test]
525 fn test_decode_int32() {
526 let reader = ParquetReader::with_defaults();
527 let data = vec![
528 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, ];
532 let col = ColumnMeta {
533 name: "test".to_string(),
534 ptype: ParquetType::Int32,
535 offset: 0,
536 size: 12,
537 num_values: 3,
538 };
539
540 let values = reader.decode_column(&data, &col, 3).unwrap();
541 assert_eq!(values.len(), 3);
542 assert_eq!(values[0], Value::Integer(1));
543 assert_eq!(values[1], Value::Integer(2));
544 assert_eq!(values[2], Value::Integer(-1));
545 }
546
547 #[test]
548 fn test_decode_float() {
549 let reader = ParquetReader::with_defaults();
550 let val: f32 = 2.5;
551 let data = val.to_le_bytes().to_vec();
552 let col = ColumnMeta {
553 name: "test".to_string(),
554 ptype: ParquetType::Float,
555 offset: 0,
556 size: 4,
557 num_values: 1,
558 };
559
560 let values = reader.decode_column(&data, &col, 1).unwrap();
561 assert_eq!(values.len(), 1);
562 if let Value::Float(f) = values[0] {
563 assert!((f - 2.5).abs() < 0.001);
564 } else {
565 panic!("Expected float");
566 }
567 }
568
569 #[test]
570 fn test_config_default() {
571 let config = ParquetConfig::default();
572 assert_eq!(config.batch_size, 10000);
573 assert!(config.columns.is_none());
574 }
575}