Skip to main content

reddb_server/storage/fdw/
csv.rs

1//! CSV foreign-data wrapper (Phase 3.2 PG parity).
2//!
3//! Exposes a CSV file on local disk as a read-only foreign table. The
4//! wrapper re-uses the RFC 4180 parser + type coercion from
5//! `storage::import::csv` so behaviour matches `COPY FROM 'path'`.
6//!
7//! # Options (table-level)
8//!
9//! | Option            | Required | Default | Description                   |
10//! |-------------------|:--------:|---------|-------------------------------|
11//! | `path`            | yes      | —       | Absolute or relative file path|
12//! | `delimiter`       | no       | `,`     | Single byte field separator   |
13//! | `header`          | no       | `true`  | Whether first row is names    |
14//! | `quote`           | no       | `"`     | Quote character               |
15//! | `treat_empty_as_null` | no   | `true`  | Empty field → Value::Null     |
16//!
17//! A server-level `base_path` option is prepended when `path` is
18//! relative, letting callers create one server per directory and many
19//! foreign tables pointing at different files inside it.
20
21use std::path::PathBuf;
22
23use super::{FdwError, FdwOptions, ForeignDataWrapper, WrapperState};
24use crate::storage::import::csv::{CsvConfig, CsvImporter};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value;
27
28/// The CSV wrapper registers under the kind `"csv"`:
29/// `CREATE SERVER srv FOREIGN DATA WRAPPER csv OPTIONS (base_path '/data/csv')`.
30pub struct CsvForeignWrapper;
31
32/// Server-level cached state. Only `base_path` lives here today;
33/// encoding / quote are per-table so they stay on the scan options.
34struct CsvServerState {
35    base_path: Option<PathBuf>,
36}
37
38impl WrapperState for CsvServerState {
39    fn as_any(&self) -> &dyn std::any::Any {
40        self
41    }
42}
43
44impl ForeignDataWrapper for CsvForeignWrapper {
45    fn kind(&self) -> &'static str {
46        "csv"
47    }
48
49    fn build_server_state(
50        &self,
51        options: &FdwOptions,
52    ) -> Result<Option<std::sync::Arc<dyn WrapperState>>, FdwError> {
53        let base_path = options.get("base_path").map(PathBuf::from);
54        Ok(Some(std::sync::Arc::new(CsvServerState { base_path })))
55    }
56
57    fn scan(
58        &self,
59        server_state: Option<&std::sync::Arc<dyn WrapperState>>,
60        table_options: &FdwOptions,
61    ) -> Result<Vec<UnifiedRecord>, FdwError> {
62        // Resolve file path: table-level `path` + optional server-level
63        // `base_path` prefix when the table path is relative.
64        let rel = table_options.require("path")?;
65        let mut path = PathBuf::from(rel);
66        if path.is_relative() {
67            if let Some(state) = server_state {
68                if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
69                    if let Some(base) = &css.base_path {
70                        path = base.join(&path);
71                    }
72                }
73            }
74        }
75
76        // Translate FDW options → CsvConfig.
77        let delimiter = table_options
78            .get("delimiter")
79            .and_then(|s| s.as_bytes().first().copied())
80            .unwrap_or(b',');
81        let quote = table_options
82            .get("quote")
83            .and_then(|s| s.as_bytes().first().copied())
84            .unwrap_or(b'"');
85        let has_header = table_options
86            .get("header")
87            .map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
88            .unwrap_or(true);
89        let treat_empty_as_null = table_options
90            .get("treat_empty_as_null")
91            .map(|s| !matches!(s.to_ascii_lowercase().as_str(), "false" | "0" | "no"))
92            .unwrap_or(true);
93
94        // Parse the file directly into memory (CsvImporter's import_reader
95        // normally inserts into a Store; here we parse then wrap records).
96        let text = std::fs::read_to_string(&path)
97            .map_err(|e| FdwError::Io(format!("read '{}': {e}", path.display())))?;
98        let records = parse_csv_records(&text, delimiter, quote).map_err(FdwError::Io)?;
99
100        let mut iter = records.into_iter();
101        let headers: Vec<String> = if has_header {
102            match iter.next() {
103                Some(row) => row,
104                None => return Ok(Vec::new()),
105            }
106        } else {
107            Vec::new()
108        };
109
110        let mut out: Vec<UnifiedRecord> = Vec::new();
111        for row in iter {
112            let names: Vec<String> = if headers.is_empty() {
113                (0..row.len()).map(|i| format!("c{i}")).collect()
114            } else {
115                headers.clone()
116            };
117            let mut record = UnifiedRecord::with_capacity(row.len());
118            for (i, field) in row.into_iter().enumerate() {
119                let name = names.get(i).cloned().unwrap_or_else(|| format!("c{i}"));
120                let value = coerce_field(&field, treat_empty_as_null);
121                record.set(&name, value);
122            }
123            out.push(record);
124        }
125        Ok(out)
126    }
127
128    fn estimated_row_count(
129        &self,
130        server_state: Option<&std::sync::Arc<dyn WrapperState>>,
131        table_options: &FdwOptions,
132    ) -> Option<usize> {
133        // Cheap estimate: file bytes / average row length (128 bytes).
134        // Doesn't open the file twice — `scan` re-reads on demand.
135        let rel = table_options.get("path")?;
136        let mut path = PathBuf::from(rel);
137        if path.is_relative() {
138            if let Some(state) = server_state {
139                if let Some(css) = state.as_any().downcast_ref::<CsvServerState>() {
140                    if let Some(base) = &css.base_path {
141                        path = base.join(&path);
142                    }
143                }
144            }
145        }
146        std::fs::metadata(&path)
147            .ok()
148            .map(|m| (m.len() / 128).max(1) as usize)
149    }
150}
151
152// ────────────────────────────────────────────────────────────────────
153// Local parser + coercion helpers
154//
155// We duplicate minimal scaffolding here (rather than re-export from
156// storage::import::csv) to keep FDW independent of the importer's
157// Store-writing path. Both trees stay consistent via shared expectations
158// in tests.
159// ────────────────────────────────────────────────────────────────────
160
161fn parse_csv_records(input: &str, delimiter: u8, quote: u8) -> Result<Vec<Vec<String>>, String> {
162    let bytes = input.as_bytes();
163    let mut records: Vec<Vec<String>> = Vec::new();
164    let mut current_row: Vec<String> = Vec::new();
165    let mut field = String::new();
166    let mut in_quotes = false;
167    let mut i = 0usize;
168    while i < bytes.len() {
169        let b = bytes[i];
170        if in_quotes {
171            if b == quote {
172                if i + 1 < bytes.len() && bytes[i + 1] == quote {
173                    field.push(quote as char);
174                    i += 2;
175                } else {
176                    in_quotes = false;
177                    i += 1;
178                }
179            } else {
180                field.push(b as char);
181                i += 1;
182            }
183        } else if b == quote && field.is_empty() {
184            in_quotes = true;
185            i += 1;
186        } else if b == delimiter {
187            current_row.push(std::mem::take(&mut field));
188            i += 1;
189        } else if b == b'\r' {
190            current_row.push(std::mem::take(&mut field));
191            records.push(std::mem::take(&mut current_row));
192            i += 1;
193            if i < bytes.len() && bytes[i] == b'\n' {
194                i += 1;
195            }
196        } else if b == b'\n' {
197            current_row.push(std::mem::take(&mut field));
198            records.push(std::mem::take(&mut current_row));
199            i += 1;
200        } else {
201            field.push(b as char);
202            i += 1;
203        }
204    }
205    if in_quotes {
206        return Err("unterminated quoted field".to_string());
207    }
208    if !field.is_empty() || !current_row.is_empty() {
209        current_row.push(field);
210        records.push(current_row);
211    }
212    Ok(records)
213}
214
215fn coerce_field(raw: &str, treat_empty_as_null: bool) -> Value {
216    if treat_empty_as_null && raw.is_empty() {
217        return Value::Null;
218    }
219    if let Ok(n) = raw.parse::<i64>() {
220        if !raw.contains('.') && !raw.contains('e') && !raw.contains('E') {
221            return Value::Integer(n);
222        }
223    }
224    if let Ok(f) = raw.parse::<f64>() {
225        if raw.contains('.') || raw.contains('e') || raw.contains('E') {
226            return Value::Float(f);
227        }
228    }
229    if raw.eq_ignore_ascii_case("true") {
230        return Value::Boolean(true);
231    }
232    if raw.eq_ignore_ascii_case("false") {
233        return Value::Boolean(false);
234    }
235    Value::text(raw.to_string())
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::storage::fdw::FdwOptions;
242
243    fn tmp_path(name: &str, contents: &str) -> PathBuf {
244        let dir = std::env::temp_dir();
245        let path = dir.join(format!("fdw_csv_{name}.csv"));
246        std::fs::write(&path, contents).expect("write temp csv");
247        path
248    }
249
250    #[test]
251    fn scans_csv_with_header() {
252        let path = tmp_path("basic", "id,name,age\n1,Alice,30\n2,Bob,25\n");
253        let wrapper = CsvForeignWrapper;
254        let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
255        let mut opts = FdwOptions::new();
256        opts.values
257            .insert("path".to_string(), path.display().to_string());
258        let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
259        assert_eq!(rows.len(), 2);
260        assert_eq!(rows[0].get("id"), Some(&Value::Integer(1)));
261        assert_eq!(rows[0].get("name"), Some(&Value::text("Alice".to_string())));
262    }
263
264    #[test]
265    fn scans_with_base_path() {
266        let path = tmp_path("base", "a,b\n1,2\n");
267        let wrapper = CsvForeignWrapper;
268        let base = path.parent().unwrap().to_path_buf();
269        let server_state = wrapper
270            .build_server_state(&FdwOptions::new().with("base_path", &base.display().to_string()))
271            .unwrap();
272        let mut opts = FdwOptions::new();
273        opts.values.insert(
274            "path".to_string(),
275            path.file_name().unwrap().to_string_lossy().into_owned(),
276        );
277        let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
278        assert_eq!(rows.len(), 1);
279        assert_eq!(rows[0].get("a"), Some(&Value::Integer(1)));
280    }
281
282    #[test]
283    fn custom_delimiter_and_quote() {
284        let path = tmp_path("sep", "id;note\n1;hello\n2;world\n");
285        let wrapper = CsvForeignWrapper;
286        let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
287        let mut opts = FdwOptions::new();
288        opts.values
289            .insert("path".to_string(), path.display().to_string());
290        opts.values.insert("delimiter".to_string(), ";".to_string());
291        let rows = wrapper.scan(server_state.as_ref(), &opts).unwrap();
292        assert_eq!(rows.len(), 2);
293        assert_eq!(rows[1].get("note"), Some(&Value::text("world".to_string())));
294    }
295
296    #[test]
297    fn missing_path_option_errors() {
298        let wrapper = CsvForeignWrapper;
299        let server_state = wrapper.build_server_state(&FdwOptions::new()).unwrap();
300        let err = wrapper
301            .scan(server_state.as_ref(), &FdwOptions::new())
302            .unwrap_err();
303        assert!(matches!(err, FdwError::MissingOption(_)));
304    }
305}