alopex_sql/executor/bulk/
csv.rs1use std::fs;
2
3use crate::catalog::TableMetadata;
4use crate::executor::{ExecutorError, Result};
5use crate::storage::SqlValue;
6
7use super::{BulkReader, CopyField, CopySchema, parse_value};
8
9pub struct CsvReader {
11 schema: CopySchema,
12 rows: Vec<Vec<SqlValue>>,
13 position: usize,
14}
15
16impl CsvReader {
17 pub fn open(path: &str, table_meta: &TableMetadata, header: bool) -> Result<Self> {
18 let content = fs::read_to_string(path)
19 .map_err(|e| ExecutorError::BulkLoad(format!("failed to read CSV: {e}")))?;
20
21 let mut lines = content.lines();
22 let header_names: Option<Vec<String>> = if header {
23 lines
24 .next()
25 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
26 } else {
27 None
28 };
29
30 let schema_fields = table_meta
31 .columns
32 .iter()
33 .enumerate()
34 .map(|(idx, col)| CopyField {
35 name: header_names
36 .as_ref()
37 .and_then(|names| names.get(idx))
38 .map(|s| s.to_string()),
39 data_type: Some(col.data_type.clone()),
40 })
41 .collect();
42
43 let mut rows = Vec::new();
44 for line in lines {
45 if line.trim().is_empty() {
46 continue;
47 }
48 let mut parts: Vec<String> = line.split(',').map(|s| s.to_string()).collect();
49 if parts.len() != table_meta.column_count() {
50 if let Some(last_ty) = table_meta.columns.last().map(|c| &c.data_type)
52 && matches!(last_ty, crate::planner::types::ResolvedType::Vector { .. })
53 && parts.len() > table_meta.column_count()
54 {
55 let head_count = table_meta.column_count().saturating_sub(1);
56 let tail = parts.split_off(head_count);
57 let merged = tail.join(",");
58 parts.push(merged);
59 }
60 }
61 if parts.len() != table_meta.column_count() {
62 return Err(ExecutorError::BulkLoad(format!(
63 "column count mismatch in row: expected {}, got {}",
64 table_meta.column_count(),
65 parts.len()
66 )));
67 }
68 let mut parsed = Vec::with_capacity(parts.len());
69 for (idx, raw) in parts.iter().enumerate() {
70 let value = parse_value(raw, &table_meta.columns[idx].data_type)?;
71 parsed.push(value);
72 }
73 rows.push(parsed);
74 }
75
76 Ok(Self {
77 schema: CopySchema {
78 fields: schema_fields,
79 },
80 rows,
81 position: 0,
82 })
83 }
84}
85
86impl BulkReader for CsvReader {
87 fn schema(&self) -> &CopySchema {
88 &self.schema
89 }
90
91 fn next_batch(&mut self, max_rows: usize) -> Result<Option<Vec<Vec<SqlValue>>>> {
92 if self.position >= self.rows.len() {
93 return Ok(None);
94 }
95 let end = (self.position + max_rows).min(self.rows.len());
96 let batch = self.rows[self.position..end].to_vec();
97 self.position = end;
98 Ok(Some(batch))
99 }
100}