gistools/readers/csv/
mod.rs

1use crate::parsers::{FeatureReader, Reader};
2use alloc::{
3    collections::{BTreeMap, VecDeque},
4    string::{String, ToString},
5    vec,
6    vec::Vec,
7};
8use core::{cell::RefCell, marker::PhantomData};
9use s2json::{
10    MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
11    VectorPoint,
12};
13use serde::de::DeserializeOwned;
14
15/// User defined options on how to parse the CSV file
16#[derive(Debug, Default)]
17pub struct CSVReaderOptions {
18    /// The delimiter to use to separate lines [Default: `","`]
19    pub delimiter: Option<char>,
20    /// The line_delimiter to use to separate lines [Default: `"\n"`]
21    pub line_delimiter: Option<char>,
22    /// If provided the lookup of the longitude [Default: `"lon"`]
23    pub lon_key: Option<String>,
24    /// If provided the lookup of the latitude [Default: `"lat"`]
25    pub lat_key: Option<String>,
26    /// If provided the lookup for the height value [Default: `None`]
27    pub height_key: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31struct CSVParser {
32    first_line: bool,
33    fields: Vec<String>,
34    offset: u64,
35    end: u64,
36    partial_line: String,
37    parsed_lines: VecDeque<String>,
38}
39impl CSVParser {
40    /// Create a new CSVParser
41    pub fn new(end: u64) -> Self {
42        Self {
43            first_line: true,
44            fields: vec![],
45            offset: 0,
46            end,
47            partial_line: String::new(),
48            parsed_lines: VecDeque::new(),
49        }
50    }
51    /// given the fields in the first line split by the delimiter and store them
52    pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
53        self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
54    }
55}
56
57/// # CSV Reader
58///
59/// ## Description
60/// Parse (Geo|S2)JSON from a file that is in the CSV format
61///
62/// Implements the [`FeatureReader`] trait
63///
64/// ## Usage
65///
66/// ### File Reader
67/// ```rust
68/// use gistools::{
69///     parsers::{FeatureReader, FileReader},
70///     readers::{CSVReader, CSVReaderOptions},
71/// };
72/// use s2json::{MValue, VectorFeature};
73/// use serde::{Deserialize, Serialize};
74/// use std::path::PathBuf;
75///
76/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
77/// path.push("tests/readers/csv/fixtures/basic.csv");
78///
79/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
80/// struct Test {
81///     name: String,
82/// }
83///
84/// let reader = CSVReader::new(FileReader::from(path), None);
85/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
86/// ```
87///
88/// ### Buffer Reader
89/// ```rust
90/// use gistools::{
91///     parsers::{FeatureReader, BufferReader},
92///     readers::{CSVReader, CSVReaderOptions},
93/// };
94/// use s2json::{MValue, VectorFeature};
95/// use serde::{Deserialize, Serialize};
96///
97/// // Ignore this setup, just ensures a passing test.
98/// use std::path::PathBuf;
99/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
100/// path.push("tests/readers/csv/fixtures/basic.csv");
101/// let data = std::fs::read(path).unwrap();
102///
103/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
104/// struct Test {
105///     name: String,
106/// }
107///
108/// let reader = CSVReader::new(BufferReader::from(data), None);
109/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
110/// ```
111///
112/// ## Links
113/// - <https://en.wikipedia.org/wiki/Comma-separated_values>
114/// - <https://cesium.com/blog/2015/04/07/quadtree-cheatseet/>
115#[derive(Debug, Clone)]
116pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
117    reader: T,
118    delimiter: char,
119    line_delimiter: char,
120    lon_key: String,
121    lat_key: String,
122    height_key: Option<String>,
123    parser: RefCell<CSVParser>,
124    _phantom: PhantomData<VectorFeature<(), P, ()>>,
125}
126impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
127    /// Create a new CSVReader
128    ///
129    /// ## Parameters
130    /// - `input`: the input data to parse from
131    /// - `options`: user defined options on how to parse the CSV file
132    ///
133    /// ## Returns
134    /// A new [`CSVReader`]
135    pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
136        let options = options.unwrap_or_default();
137        let len = reader.len();
138        CSVReader {
139            reader,
140            delimiter: options.delimiter.unwrap_or(','),
141            line_delimiter: options.line_delimiter.unwrap_or('\n'),
142            lon_key: options.lon_key.unwrap_or("lon".into()),
143            lat_key: options.lat_key.unwrap_or("lat".into()),
144            height_key: options.height_key,
145            parser: RefCell::new(CSVParser::new(len)),
146            _phantom: PhantomData,
147        }
148    }
149
150    /// Set a new position in the file for parallel processing
151    pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
152        // FIRST we run next_feature just to set the contents of the parser (specifically the fields)
153        {
154            *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
155            self.next_feature();
156        }
157        // setup chunk size, start and end
158        let len = self.reader.len();
159        let chunk_size = len.div_ceil(pool_size);
160        let mut start = thread_id.saturating_mul(chunk_size);
161        let mut end = u64::min(start + chunk_size, len);
162        // align to separators
163        if thread_id > 0 {
164            start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
165        }
166        if thread_id < pool_size - 1 {
167            end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
168        }
169        // update parser
170        let mut parser = self.parser.borrow_mut();
171        if thread_id == 0 {
172            *parser = CSVParser::new(end);
173        } else {
174            parser.partial_line.clear();
175            parser.parsed_lines.clear();
176        }
177        parser.offset = start;
178        parser.end = end;
179    }
180
181    /// Grab the next feature if it exists
182    pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
183        let mut parser = self.parser.borrow_mut();
184        // Keep returning from the queue if there's data
185        while let Some(line) = parser.parsed_lines.pop_front() {
186            let trimmed = line.trim();
187            if trimmed.is_empty() || trimmed.starts_with('#') {
188                continue;
189            }
190            if parser.first_line {
191                parser.parse_first_line(trimmed, self.delimiter);
192                parser.first_line = false;
193            } else {
194                return Some(self.parse_line(trimmed, &parser.fields));
195            }
196        }
197
198        // Read more if we're not done
199        if parser.offset < parser.end {
200            let length = u64::min(65_536, parser.end - parser.offset);
201            let chunk = parser.partial_line.clone()
202                + &self.reader.parse_string(Some(parser.offset), Some(length));
203            parser.offset += length;
204
205            parser.partial_line.clear();
206            let mut lines = chunk
207                .split(self.line_delimiter)
208                .map(str::to_string)
209                .filter(|s| !s.is_empty())
210                .collect::<Vec<_>>();
211
212            if let Some(last) = lines.pop() {
213                parser.partial_line = last;
214            }
215
216            parser.parsed_lines.extend(lines);
217            // drop the parser before calling next_feature
218            drop(parser);
219            return self.next_feature(); // recurse now that buffer is filled
220        }
221
222        // Final line after file ends
223        if !parser.partial_line.is_empty() {
224            let line = core::mem::take(&mut parser.partial_line);
225            let trimmed = line.trim();
226            if trimmed.is_empty() || trimmed.starts_with('#') {
227                return None;
228            }
229            if parser.first_line {
230                parser.parse_first_line(trimmed, self.delimiter);
231                parser.first_line = false;
232                drop(parser);
233                return self.next_feature(); // recurse again to skip header
234            } else {
235                return Some(self.parse_line(trimmed, &parser.fields));
236            }
237        }
238
239        None
240    }
241
242    /// given a line, parse the values mapped to the first lines fields
243    /// returns a GeoJSON Vector Feature
244    fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
245        let values: Vec<String> =
246            line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
247        let mut properties = Properties::new();
248        let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
249
250        for (value, field) in values.iter().zip(fields.iter()) {
251            if field.is_empty() || value.is_empty() {
252                continue;
253            }
254
255            let value_num: f64 = value.parse().unwrap_or(0.0);
256            if *field == self.lon_key {
257                coordinates.x = value_num;
258            } else if *field == self.lat_key {
259                coordinates.y = value_num;
260            } else if Some(field) == self.height_key.as_ref() {
261                coordinates.z = Some(value_num);
262            } else {
263                properties.insert(field.clone(), value.into());
264            }
265        }
266        if coordinates.x.is_nan() || coordinates.y.is_nan() {
267            panic!("coordinates must be finite numbers");
268        }
269
270        VectorFeature {
271            _type: "VectorFeature".into(),
272            geometry: VectorGeometry::new_point(coordinates, None),
273            properties: (&properties).into(),
274            ..Default::default()
275        }
276    }
277}
278impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
279    type Item = VectorFeature<(), P, MValue>;
280    fn next(&mut self) -> Option<Self::Item> {
281        self.next_feature()
282    }
283}
284/// The CSV Iterator tool
285#[derive(Debug, Clone)]
286pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
287    reader: &'a CSVReader<T, P>,
288}
289impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
290    type Item = VectorFeature<(), P, MValue>;
291
292    fn next(&mut self) -> Option<Self::Item> {
293        self.reader.next_feature()
294    }
295}
296/// A feature reader trait with a callback-based approach
297impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
298    for CSVReader<T, P>
299{
300    type FeatureIterator<'a>
301        = CSVIterator<'a, T, P>
302    where
303        T: 'a,
304        P: 'a;
305
306    fn iter(&self) -> Self::FeatureIterator<'_> {
307        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
308        CSVIterator { reader: self }
309    }
310
311    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
312        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
313        self.par_seek(pool_size as u64, thread_id as u64);
314        CSVIterator { reader: self }
315    }
316}
317
318/// Parse CSV data into a record
319/// the source is the source of the CSV data
320/// the delimiter is the character used to separate fields
321/// the line_delimiter is the character used to separate lines
322/// returns an object with key-value pairs whose keys and values are both strings
323///
324/// Example:
325/// ```rust
326/// use gistools::readers::parse_csv_as_record;
327/// use s2json::MValue;
328///
329/// #[derive(Debug, Default, Clone, PartialEq, MValue)]
330/// struct Test {
331///     a: String,
332///     b: String,
333///     c: String,
334/// }
335/// let source = "a,b,c\n1,2,3\n4,5,6";
336/// let res = parse_csv_as_record::<Test>(source, None, None);
337///
338/// assert_eq!(
339///     res,
340///     vec![
341///         Test { a: "1".into(), b: "2".into(), c: "3".into() },
342///         Test { a: "4".into(), b: "5".into(), c: "6".into() },
343///     ]
344/// );
345/// ```
346pub fn parse_csv_as_record<T: MValueCompatible>(
347    source: &str,
348    delimiter: Option<char>,
349    line_delimiter: Option<char>,
350) -> Vec<T> {
351    let delimiter = delimiter.unwrap_or(',');
352    let line_delimiter = line_delimiter.unwrap_or('\n');
353    let mut res = vec![];
354    let lines: Vec<&str> = source.split(line_delimiter).collect();
355    let header = parse_csv_line(lines[0], delimiter);
356
357    for raw_line in lines.iter().skip(1) {
358        let line = raw_line.trim();
359        if line.is_empty() {
360            continue;
361        }
362
363        let mut record = MValue::new();
364        let values = parse_csv_line(line, delimiter);
365
366        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
367            let val =
368                if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
369            record.insert(header.into(), val);
370        }
371
372        res.push(record.into());
373    }
374
375    res
376}
377
378/// Parse CSV data into a BTreeMap record where both the keys and values are strings
379pub fn parse_csv_as_btree(
380    source: &str,
381    delimiter: Option<char>,
382    line_delimiter: Option<char>,
383) -> Vec<BTreeMap<String, String>> {
384    let delimiter = delimiter.unwrap_or(',');
385    let line_delimiter = line_delimiter.unwrap_or('\n');
386    let mut res = vec![];
387    let lines: Vec<&str> = source.split(line_delimiter).collect();
388    let header = parse_csv_line(lines[0], delimiter);
389
390    for raw_line in lines.iter().skip(1) {
391        let line = raw_line.trim();
392        if line.is_empty() {
393            continue;
394        }
395
396        let mut record = BTreeMap::new();
397        let values = parse_csv_line(line, delimiter);
398
399        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
400            let val = value.trim();
401            if val.is_empty() {
402                continue;
403            }
404            record.insert(header.clone(), val.to_string());
405        }
406
407        res.push(record);
408    }
409
410    res
411}
412
413/// Parses a line of a CSV file into a vector of values split by the delimiter.
414/// Handles quoted values that contain the delimiter.
415pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
416    let mut result = Vec::new();
417    let mut current = String::new();
418    let mut in_quotes = false;
419    let mut quote_char = None;
420
421    let chars: Vec<char> = line.chars().collect();
422    let mut i = 0;
423
424    while i < chars.len() {
425        let ch = chars[i];
426
427        if (ch == '"' || ch == '\'') && !in_quotes {
428            in_quotes = true;
429            quote_char = Some(ch);
430        } else if Some(ch) == quote_char && in_quotes {
431            // Check for escaped quote
432            if i + 1 < chars.len() && chars[i + 1] == ch {
433                current.push(ch);
434                i += 1; // Skip the next quote
435            } else {
436                in_quotes = false;
437            }
438        } else if ch == delimiter && !in_quotes {
439            result.push(current.trim().into());
440            current.clear();
441        } else {
442            current.push(ch);
443        }
444
445        i += 1;
446    }
447
448    // Push the final field
449    if !current.is_empty() {
450        result.push(current.trim().into());
451    }
452
453    result
454}
455
456// Helper function to align to delimiters if using parallel processing
457fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
458    let sep_u8 = sep as u8;
459
460    while pos < end {
461        let len = u64::min(65_536, end - pos);
462        let chunk = reader.parse_string(Some(pos), Some(len));
463        if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
464            return pos + rel as u64 + 1; // move past delimiter
465        }
466        pos += len;
467    }
468    end
469}