use std::path::PathBuf;
use super::{FdwError, FdwOptions, ForeignDataWrapper, WrapperState};
use crate::storage::import::csv::{CsvConfig, CsvImporter};
use crate::storage::query::unified::UnifiedRecord;
use crate::storage::schema::Value;
pub struct CsvForeignWrapper;
struct CsvServerState {
base_path: Option<PathBuf>,
}
impl WrapperState for CsvServerState {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl ForeignDataWrapper for CsvForeignWrapper {
fn kind(&self) -> &'static str {
"csv"
}
fn build_server_state(
&self,
options: &FdwOptions,
) -> Result<Option<std::sync::Arc<dyn WrapperState>>, FdwError> {
let base_path = options.get("base_path").map(PathBuf::from);
Ok(Some(std::sync::Arc::new(CsvServerState { base_path })))
}
fn scan(
&self,
server_state: Option<&std::sync::Arc<dyn WrapperState>>,
table_options: &FdwOptions,
) -> Result<Vec<UnifiedRecord>, FdwError> {
let rel = table_options.require("path")?;
let mut path = PathBuf::from(rel);
if path.is_relative() {
if let Some(state) = server_state {
if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
if let Some(base) = &css.base_path {
path = base.join(&path);
}
}
}
}
let delimiter = table_options
.get("delimiter")
.and_then(|s| s.as_bytes().first().copied())
.unwrap_or(b',');
let quote = table_options
.get("quote")
.and_then(|s| s.as_bytes().first().copied())
.unwrap_or(b'"');
let has_header = table_options
.get("header")
.map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
.unwrap_or(true);
let treat_empty_as_null = table_options
.get("treat_empty_as_null")
.map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
.unwrap_or(true);
let text = std::fs::read_to_string(&path)
.map_err(|e| FdwError::Io(format!("read '{}': {e}", path.display())))?;
let records = parse_csv_records(&text, delimiter, quote).map_err(FdwError::Io)?;
let mut iter = records.into_iter();
let headers: Vec<String> = if has_header {
match iter.next() {
Some(row) => row,
None => return Ok(Vec::new()),
}
} else {
Vec::new()
};
let mut out: Vec<UnifiedRecord> = Vec::new();
for row in iter {
let names: Vec<String> = if headers.is_empty() {
(0..row.len()).map(|i| format!("c{i}")).collect()
} else {
headers.clone()
};
let mut record = UnifiedRecord::with_capacity(row.len());
for (i, field) in row.into_iter().enumerate() {
let name = names.get(i).cloned().unwrap_or_else(|| format!("c{i}"));
let value = coerce_field(&field, treat_empty_as_null);
record.set(&name, value);
}
out.push(record);
}
Ok(out)
}
fn estimated_row_count(
&self,
server_state: Option<&std::sync::Arc<dyn WrapperState>>,
table_options: &FdwOptions,
) -> Option<usize> {
let rel = table_options.get("path")?;
let mut path = PathBuf::from(rel);
if path.is_relative() {
if let Some(state) = server_state {
if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
if let Some(base) = &css.base_path {
path = base.join(&path);
}
}
}
}
std::fs::metadata(&path)
.ok()
.map(|m| (m.len() / 128).max(1) as usize)
}
}
fn parse_csv_records(input: &str, delimiter: u8, quote: u8) -> Result<Vec<Vec<String>>, String> {
let bytes = input.as_bytes();
let mut records: Vec<Vec<String>> = Vec::new();
let mut current_row: Vec<String> = Vec::new();
let mut field = String::new();
let mut in_quotes = false;
let mut i = 0usize;
while i < bytes.len() {
let b = bytes[i];
if in_quotes {
if b == quote {
if i + 1 < bytes.len() && bytes[i + 1] == quote {
field.push(quote as char);
i += 2;
} else {
in_quotes = false;
i += 1;
}
} else {
field.push(b as char);
i += 1;
}
} else if b == quote && field.is_empty() {
in_quotes = true;
i += 1;
} else if b == delimiter {
current_row.push(std::mem::take(&mut field));
i += 1;
} else if b == b'\r' {
current_row.push(std::mem::take(&mut field));
records.push(std::mem::take(&mut current_row));
i += 1;
if i < bytes.len() && bytes[i] == b'\n' {
i += 1;
}
} else if b == b'\n' {
current_row.push(std::mem::take(&mut field));
records.push(std::mem::take(&mut current_row));
i += 1;
} else {
field.push(b as char);
i += 1;
}
}
if in_quotes {
return Err("unterminated quoted field".to_string());
}
if !field.is_empty() || !current_row.is_empty() {
current_row.push(field);
records.push(current_row);
}
Ok(records)
}
fn coerce_field(raw: &str, treat_empty_as_null: bool) -> Value {
if treat_empty_as_null && raw.is_empty() {
return Value::Null;
}
if let Ok(n) = raw.parse::<i64>() {
if !raw.contains('.') && !raw.contains('e') && !raw.contains('E') {
return Value::Integer(n);
}
}
if let Ok(f) = raw.parse::<f64>() {
if raw.contains('.') || raw.contains('e') || raw.contains('E') {
return Value::Float(f);
}
}
if raw.eq_ignore_ascii_case("true") {
return Value::Boolean(true);
}
if raw.eq_ignore_ascii_case("false") {
return Value::Boolean(false);
}
Value::text(raw.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::fdw::FdwOptions;
fn tmp_path(name: &str, contents: &str) -> PathBuf {
let dir = std::env::temp_dir();
let path = dir.join(format!("fdw_csv_{name}.csv"));
std::fs::write(&path, contents).expect("write temp csv");
path
}
#[test]
fn scans_csv_with_header() {
let path = tmp_path("basic", "id,name,age\n1,Alice,30\n2,Bob,25\n");
let wrapper = CsvForeignWrapper;
let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
let mut opts = FdwOptions::new();
opts.values
.insert("path".to_string(), path.display().to_string());
let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].get("id"), Some(&Value::Integer(1)));
assert_eq!(rows[0].get("name"), Some(&Value::text("Alice".to_string())));
}
#[test]
fn scans_with_base_path() {
let path = tmp_path("base", "a,b\n1,2\n");
let wrapper = CsvForeignWrapper;
let base = path.parent().unwrap().to_path_buf();
let server_state = wrapper
.build_server_state(&FdwOptions::new().with("base_path", &base.display().to_string()))
.unwrap();
let mut opts = FdwOptions::new();
opts.values.insert(
"path".to_string(),
path.file_name().unwrap().to_string_lossy().into_owned(),
);
let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get("a"), Some(&Value::Integer(1)));
}
#[test]
fn custom_delimiter_and_quote() {
let path = tmp_path("sep", "id;note\n1;hello\n2;world\n");
let wrapper = CsvForeignWrapper;
let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
let mut opts = FdwOptions::new();
opts.values
.insert("path".to_string(), path.display().to_string());
opts.values.insert("delimiter".to_string(), ";".to_string());
let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[1].get("note"), Some(&Value::text("world".to_string())));
}
#[test]
fn missing_path_option_errors() {
let wrapper = CsvForeignWrapper;
let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
let err = wrapper
.scan(server_state.as_ref(), &FdwOptions::new())
.unwrap_err();
assert!(matches!(err, FdwError::MissingOption(_)));
}
}