reddb_server/storage/fdw/
csv.rs1use std::path::PathBuf;
22
23use super::{FdwError, FdwOptions, ForeignDataWrapper, WrapperState};
24use crate::storage::import::csv::{CsvConfig, CsvImporter};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value;
27
28pub struct CsvForeignWrapper;
31
32struct CsvServerState {
35 base_path: Option<PathBuf>,
36}
37
38impl WrapperState for CsvServerState {
39 fn as_any(&self) -> &dyn std::any::Any {
40 self
41 }
42}
43
44impl ForeignDataWrapper for CsvForeignWrapper {
45 fn kind(&self) -> &'static str {
46 "csv"
47 }
48
49 fn build_server_state(
50 &self,
51 options: &FdwOptions,
52 ) -> Result<Option<std::sync::Arc<dyn WrapperState>>, FdwError> {
53 let base_path = options.get("base_path").map(PathBuf::from);
54 Ok(Some(std::sync::Arc::new(CsvServerState { base_path })))
55 }
56
57 fn scan(
58 &self,
59 server_state: Option<&std::sync::Arc<dyn WrapperState>>,
60 table_options: &FdwOptions,
61 ) -> Result<Vec<UnifiedRecord>, FdwError> {
62 let rel = table_options.require("path")?;
65 let mut path = PathBuf::from(rel);
66 if path.is_relative() {
67 if let Some(state) = server_state {
68 if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
69 if let Some(base) = &css.base_path {
70 path = base.join(&path);
71 }
72 }
73 }
74 }
75
76 let delimiter = table_options
78 .get("delimiter")
79 .and_then(|s| s.as_bytes().first().copied())
80 .unwrap_or(b',');
81 let quote = table_options
82 .get("quote")
83 .and_then(|s| s.as_bytes().first().copied())
84 .unwrap_or(b'"');
85 let has_header = table_options
86 .get("header")
87 .map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
88 .unwrap_or(true);
89 let treat_empty_as_null = table_options
90 .get("treat_empty_as_null")
91 .map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
92 .unwrap_or(true);
93
94 let text = std::fs::read_to_string(&path)
97 .map_err(|e| FdwError::Io(format!("read '{}': {e}", path.display())))?;
98 let records = parse_csv_records(&text, delimiter, quote).map_err(FdwError::Io)?;
99
100 let mut iter = records.into_iter();
101 let headers: Vec<String> = if has_header {
102 match iter.next() {
103 Some(row) => row,
104 None => return Ok(Vec::new()),
105 }
106 } else {
107 Vec::new()
108 };
109
110 let mut out: Vec<UnifiedRecord> = Vec::new();
111 for row in iter {
112 let names: Vec<String> = if headers.is_empty() {
113 (0..row.len()).map(|i| format!("c{i}")).collect()
114 } else {
115 headers.clone()
116 };
117 let mut record = UnifiedRecord::with_capacity(row.len());
118 for (i, field) in row.into_iter().enumerate() {
119 let name = names.get(i).cloned().unwrap_or_else(|| format!("c{i}"));
120 let value = coerce_field(&field, treat_empty_as_null);
121 record.set(&name, value);
122 }
123 out.push(record);
124 }
125 Ok(out)
126 }
127
128 fn estimated_row_count(
129 &self,
130 server_state: Option<&std::sync::Arc<dyn WrapperState>>,
131 table_options: &FdwOptions,
132 ) -> Option<usize> {
133 let rel = table_options.get("path")?;
136 let mut path = PathBuf::from(rel);
137 if path.is_relative() {
138 if let Some(state) = server_state {
139 if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
140 if let Some(base) = &css.base_path {
141 path = base.join(&path);
142 }
143 }
144 }
145 }
146 std::fs::metadata(&path)
147 .ok()
148 .map(|m| (m.len() / 128).max(1) as usize)
149 }
150}
151
152fn parse_csv_records(input: &str, delimiter: u8, quote: u8) -> Result<Vec<Vec<String>>, String> {
162 let bytes = input.as_bytes();
163 let mut records: Vec<Vec<String>> = Vec::new();
164 let mut current_row: Vec<String> = Vec::new();
165 let mut field = String::new();
166 let mut in_quotes = false;
167 let mut i = 0usize;
168 while i < bytes.len() {
169 let b = bytes[i];
170 if in_quotes {
171 if b == quote {
172 if i + 1 < bytes.len() && bytes[i + 1] == quote {
173 field.push(quote as char);
174 i += 2;
175 } else {
176 in_quotes = false;
177 i += 1;
178 }
179 } else {
180 field.push(b as char);
181 i += 1;
182 }
183 } else if b == quote && field.is_empty() {
184 in_quotes = true;
185 i += 1;
186 } else if b == delimiter {
187 current_row.push(std::mem::take(&mut field));
188 i += 1;
189 } else if b == b'\r' {
190 current_row.push(std::mem::take(&mut field));
191 records.push(std::mem::take(&mut current_row));
192 i += 1;
193 if i < bytes.len() && bytes[i] == b'\n' {
194 i += 1;
195 }
196 } else if b == b'\n' {
197 current_row.push(std::mem::take(&mut field));
198 records.push(std::mem::take(&mut current_row));
199 i += 1;
200 } else {
201 field.push(b as char);
202 i += 1;
203 }
204 }
205 if in_quotes {
206 return Err("unterminated quoted field".to_string());
207 }
208 if !field.is_empty() || !current_row.is_empty() {
209 current_row.push(field);
210 records.push(current_row);
211 }
212 Ok(records)
213}
214
215fn coerce_field(raw: &str, treat_empty_as_null: bool) -> Value {
216 if treat_empty_as_null && raw.is_empty() {
217 return Value::Null;
218 }
219 if let Ok(n) = raw.parse::<i64>() {
220 if !raw.contains('.') && !raw.contains('e') && !raw.contains('E') {
221 return Value::Integer(n);
222 }
223 }
224 if let Ok(f) = raw.parse::<f64>() {
225 if raw.contains('.') || raw.contains('e') || raw.contains('E') {
226 return Value::Float(f);
227 }
228 }
229 if raw.eq_ignore_ascii_case("true") {
230 return Value::Boolean(true);
231 }
232 if raw.eq_ignore_ascii_case("false") {
233 return Value::Boolean(false);
234 }
235 Value::text(raw.to_string())
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::storage::fdw::FdwOptions;
242
243 fn tmp_path(name: &str, contents: &str) -> PathBuf {
244 let dir = std::env::temp_dir();
245 let path = dir.join(format!("fdw_csv_{name}.csv"));
246 std::fs::write(&path, contents).expect("write temp csv");
247 path
248 }
249
250 #[test]
251 fn scans_csv_with_header() {
252 let path = tmp_path("basic", "id,name,age\n1,Alice,30\n2,Bob,25\n");
253 let wrapper = CsvForeignWrapper;
254 let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
255 let mut opts = FdwOptions::new();
256 opts.values
257 .insert("path".to_string(), path.display().to_string());
258 let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
259 assert_eq!(rows.len(), 2);
260 assert_eq!(rows[0].get("id"), Some(&Value::Integer(1)));
261 assert_eq!(rows[0].get("name"), Some(&Value::text("Alice".to_string())));
262 }
263
264 #[test]
265 fn scans_with_base_path() {
266 let path = tmp_path("base", "a,b\n1,2\n");
267 let wrapper = CsvForeignWrapper;
268 let base = path.parent().unwrap().to_path_buf();
269 let server_state = wrapper
270 .build_server_state(&FdwOptions::new().with("base_path", &base.display().to_string()))
271 .unwrap();
272 let mut opts = FdwOptions::new();
273 opts.values.insert(
274 "path".to_string(),
275 path.file_name().unwrap().to_string_lossy().into_owned(),
276 );
277 let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
278 assert_eq!(rows.len(), 1);
279 assert_eq!(rows[0].get("a"), Some(&Value::Integer(1)));
280 }
281
282 #[test]
283 fn custom_delimiter_and_quote() {
284 let path = tmp_path("sep", "id;note\n1;hello\n2;world\n");
285 let wrapper = CsvForeignWrapper;
286 let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
287 let mut opts = FdwOptions::new();
288 opts.values
289 .insert("path".to_string(), path.display().to_string());
290 opts.values.insert("delimiter".to_string(), ";".to_string());
291 let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
292 assert_eq!(rows.len(), 2);
293 assert_eq!(rows[1].get("note"), Some(&Value::text("world".to_string())));
294 }
295
296 #[test]
297 fn missing_path_option_errors() {
298 let wrapper = CsvForeignWrapper;
299 let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
300 let err = wrapper
301 .scan(server_state.as_ref(), &FdwOptions::new())
302 .unwrap_err();
303 assert!(matches!(err, FdwError::MissingOption(_)));
304 }
305}