alopex_sql/executor/bulk/
csv.rs

1use std::fs;
2
3use crate::catalog::TableMetadata;
4use crate::executor::{ExecutorError, Result};
5use crate::storage::SqlValue;
6
7use super::{BulkReader, CopyField, CopySchema, parse_value};
8
9/// 簡易 CSV リーダー。
10pub struct CsvReader {
11    schema: CopySchema,
12    rows: Vec<Vec<SqlValue>>,
13    position: usize,
14}
15
16impl CsvReader {
17    pub fn open(path: &str, table_meta: &TableMetadata, header: bool) -> Result<Self> {
18        let content = fs::read_to_string(path)
19            .map_err(|e| ExecutorError::BulkLoad(format!("failed to read CSV: {e}")))?;
20
21        let mut lines = content.lines();
22        let header_names: Option<Vec<String>> = if header {
23            lines
24                .next()
25                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
26        } else {
27            None
28        };
29
30        let schema_fields = table_meta
31            .columns
32            .iter()
33            .enumerate()
34            .map(|(idx, col)| CopyField {
35                name: header_names
36                    .as_ref()
37                    .and_then(|names| names.get(idx))
38                    .map(|s| s.to_string()),
39                data_type: Some(col.data_type.clone()),
40            })
41            .collect();
42
43        let mut rows = Vec::new();
44        for line in lines {
45            if line.trim().is_empty() {
46                continue;
47            }
48            let mut parts: Vec<String> = line.split(',').map(|s| s.to_string()).collect();
49            if parts.len() != table_meta.column_count() {
50                // 特例: 最終列が VECTOR の場合は余りを結合して帳尻を合わせる(埋め込みにカンマが含まれるため)。
51                if let Some(last_ty) = table_meta.columns.last().map(|c| &c.data_type)
52                    && matches!(last_ty, crate::planner::types::ResolvedType::Vector { .. })
53                    && parts.len() > table_meta.column_count()
54                {
55                    let head_count = table_meta.column_count().saturating_sub(1);
56                    let tail = parts.split_off(head_count);
57                    let merged = tail.join(",");
58                    parts.push(merged);
59                }
60            }
61            if parts.len() != table_meta.column_count() {
62                return Err(ExecutorError::BulkLoad(format!(
63                    "column count mismatch in row: expected {}, got {}",
64                    table_meta.column_count(),
65                    parts.len()
66                )));
67            }
68            let mut parsed = Vec::with_capacity(parts.len());
69            for (idx, raw) in parts.iter().enumerate() {
70                let value = parse_value(raw, &table_meta.columns[idx].data_type)?;
71                parsed.push(value);
72            }
73            rows.push(parsed);
74        }
75
76        Ok(Self {
77            schema: CopySchema {
78                fields: schema_fields,
79            },
80            rows,
81            position: 0,
82        })
83    }
84}
85
86impl BulkReader for CsvReader {
87    fn schema(&self) -> &CopySchema {
88        &self.schema
89    }
90
91    fn next_batch(&mut self, max_rows: usize) -> Result<Option<Vec<Vec<SqlValue>>>> {
92        if self.position >= self.rows.len() {
93            return Ok(None);
94        }
95        let end = (self.position + max_rows).min(self.rows.len());
96        let batch = self.rows[self.position..end].to_vec();
97        self.position = end;
98        Ok(Some(batch))
99    }
100}