gistools/readers/csv/
mod.rs

1use crate::parsers::{FeatureReader, Reader};
2use alloc::{
3    collections::{BTreeMap, VecDeque},
4    string::{String, ToString},
5    vec,
6    vec::Vec,
7};
8use core::{cell::RefCell, marker::PhantomData};
9use s2json::{
10    MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
11    VectorPoint,
12};
13use serde::de::DeserializeOwned;
14
15/// User defined options on how to parse the CSV file
16#[derive(Debug, Default)]
17pub struct CSVReaderOptions {
18    /// The delimiter to use to separate lines [Default: `","`]
19    pub delimiter: Option<char>,
20    /// The line_delimiter to use to separate lines [Default: `"\n"`]
21    pub line_delimiter: Option<char>,
22    /// If provided the lookup of the longitude [Default: `"lon"`]
23    pub lon_key: Option<String>,
24    /// If provided the lookup of the latitude [Default: `"lat"`]
25    pub lat_key: Option<String>,
26    /// If provided the lookup for the height value [Default: `None`]
27    pub height_key: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31struct CSVParser {
32    first_line: bool,
33    fields: Vec<String>,
34    offset: u64,
35    end: u64,
36    partial_line: String,
37    parsed_lines: VecDeque<String>,
38}
39impl CSVParser {
40    /// Create a new CSVParser
41    pub fn new(end: u64) -> Self {
42        Self {
43            first_line: true,
44            fields: vec![],
45            offset: 0,
46            end,
47            partial_line: String::new(),
48            parsed_lines: VecDeque::new(),
49        }
50    }
51    /// given the fields in the first line split by the delimiter and store them
52    pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
53        self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
54    }
55}
56
57/// # CSV Reader
58///
59/// ## Description
60/// Parse (Geo|S2)JSON from a file that is in the CSV format
61///
62/// Implements the [`FeatureReader`] trait
63///
64/// ## Usage
65///
66/// ### File Reader
67/// ```rust
68/// use gistools::{
69///     parsers::{FeatureReader, FileReader},
70///     readers::{CSVReader, CSVReaderOptions},
71/// };
72/// use s2json::{MValue, VectorFeature};
73/// use serde::{Deserialize, Serialize};
74/// use std::path::PathBuf;
75///
76/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
77/// path.push("tests/readers/csv/fixtures/basic.csv");
78///
79/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
80/// struct Test {
81///     name: String,
82/// }
83///
84/// let reader = CSVReader::new(FileReader::from(path), None);
85/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
86/// ```
87///
88/// ### Buffer Reader
89/// ```rust
90/// use gistools::{
91///     parsers::{FeatureReader, BufferReader},
92///     readers::{CSVReader, CSVReaderOptions},
93/// };
94/// use s2json::{MValue, VectorFeature};
95/// use serde::{Deserialize, Serialize};
96///
97/// // Ignore this setup, just ensures a passing test.
98/// use std::path::PathBuf;
99/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
100/// path.push("tests/readers/csv/fixtures/basic.csv");
101/// let data = std::fs::read(path).unwrap();
102///
103/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
104/// struct Test {
105///     name: String,
106/// }
107///
108/// let reader = CSVReader::new(BufferReader::from(data), None);
109/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
110/// ```
111///
112/// ## Links
113/// - <https://en.wikipedia.org/wiki/Comma-separated_values>
114/// - <https://cesium.com/blog/2015/04/07/quadtree-cheatseet/>
115#[derive(Debug, Clone)]
116pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
117    reader: T,
118    delimiter: char,
119    line_delimiter: char,
120    lon_key: String,
121    lat_key: String,
122    height_key: Option<String>,
123    parser: RefCell<CSVParser>,
124    _phantom: PhantomData<VectorFeature<(), P, ()>>,
125}
126impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
127    /// Create a new CSVReader
128    ///
129    /// ## Parameters
130    /// - `input`: the input data to parse from
131    /// - `options`: user defined options on how to parse the CSV file
132    ///
133    /// ## Returns
134    /// A new [`CSVReader`]
135    pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
136        let options = options.unwrap_or_default();
137        let len = reader.len();
138        CSVReader {
139            reader,
140            delimiter: options.delimiter.unwrap_or(','),
141            line_delimiter: options.line_delimiter.unwrap_or('\n'),
142            lon_key: options.lon_key.unwrap_or("lon".into()),
143            lat_key: options.lat_key.unwrap_or("lat".into()),
144            height_key: options.height_key,
145            parser: RefCell::new(CSVParser::new(len)),
146            _phantom: PhantomData,
147        }
148    }
149
150    /// Set a new position in the file for parallel processing
151    pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
152        // FIRST we run next_feature just to set the contents of the parser (specifically the fields)
153        {
154            *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
155            self.next_feature();
156        }
157        // setup chunk size, start and end
158        let len = self.reader.len();
159        let chunk_size = len.div_ceil(pool_size);
160        let mut start = thread_id.saturating_mul(chunk_size);
161        let mut end = u64::min(start + chunk_size, len);
162        // align to separators
163        if thread_id > 0 {
164            start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
165        }
166        if thread_id < pool_size - 1 {
167            end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
168        }
169        // update parser
170        let mut parser = self.parser.borrow_mut();
171        if thread_id == 0 {
172            *parser = CSVParser::new(end);
173        } else {
174            parser.partial_line.clear();
175            parser.parsed_lines.clear();
176        }
177        parser.offset = start;
178        parser.end = end;
179    }
180
181    /// Grab the next feature if it exists
182    pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
183        let mut parser = self.parser.borrow_mut();
184        // Keep returning from the queue if there's data
185        while let Some(line) = parser.parsed_lines.pop_front() {
186            let trimmed = line.trim();
187            if trimmed.is_empty() || trimmed.starts_with('#') {
188                continue;
189            }
190            if parser.first_line {
191                parser.parse_first_line(trimmed, self.delimiter);
192                parser.first_line = false;
193            } else {
194                return Some(self.parse_line(trimmed, &parser.fields));
195            }
196        }
197
198        // Read more if we're not done
199        if parser.offset < parser.end {
200            let length = u64::min(65_536, parser.end - parser.offset);
201            let chunk = parser.partial_line.clone()
202                + &self.reader.parse_string(Some(parser.offset), Some(length));
203            parser.offset += length;
204
205            parser.partial_line.clear();
206            let mut lines = chunk
207                .split(self.line_delimiter)
208                .map(str::to_string)
209                .filter(|s| !s.is_empty())
210                .collect::<Vec<_>>();
211
212            if let Some(last) = lines.pop() {
213                parser.partial_line = last;
214            }
215
216            parser.parsed_lines.extend(lines);
217            // drop the parser before calling next_feature
218            drop(parser);
219            return self.next_feature(); // recurse now that buffer is filled
220        }
221
222        // Final line after file ends
223        if !parser.partial_line.is_empty() {
224            let line = std::mem::take(&mut parser.partial_line);
225            let trimmed = line.trim();
226            if trimmed.is_empty() || trimmed.starts_with('#') {
227                return None;
228            }
229            if parser.first_line {
230                parser.parse_first_line(trimmed, self.delimiter);
231                parser.first_line = false;
232                drop(parser);
233                return self.next_feature(); // recurse again to skip header
234            } else {
235                return Some(self.parse_line(trimmed, &parser.fields));
236            }
237        }
238
239        None
240    }
241
242    /// given a line, parse the values mapped to the first lines fields
243    /// returns a GeoJSON Vector Feature
244    fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
245        let values: Vec<String> =
246            line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
247        let mut properties = Properties::new();
248        let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
249
250        for (value, field) in values.iter().zip(fields.iter()) {
251            if field.is_empty() || value.is_empty() {
252                continue;
253            }
254
255            let value_num: f64 = value.parse().unwrap_or(0.0);
256            if *field == self.lon_key {
257                coordinates.x = value_num;
258            } else if *field == self.lat_key {
259                coordinates.y = value_num;
260            } else if Some(field) == self.height_key.as_ref() {
261                coordinates.z = Some(value_num);
262            } else {
263                properties.insert(field.clone(), value.into());
264            }
265        }
266        if coordinates.x.is_nan() || coordinates.y.is_nan() {
267            panic!("coordinates must be finite numbers");
268        }
269
270        VectorFeature {
271            _type: "VectorFeature".into(),
272            geometry: VectorGeometry::new_point(coordinates, None),
273            properties: (&properties).into(),
274            ..Default::default()
275        }
276    }
277}
278impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
279    type Item = VectorFeature<(), P, MValue>;
280    fn next(&mut self) -> Option<Self::Item> {
281        self.next_feature()
282    }
283}
284/// The CSV Iterator tool
285#[derive(Debug, Clone)]
286pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
287    reader: &'a CSVReader<T, P>,
288}
289impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
290    type Item = VectorFeature<(), P, MValue>;
291
292    fn next(&mut self) -> Option<Self::Item> {
293        self.reader.next_feature()
294    }
295}
296/// A feature reader trait with a callback-based approach
297impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
298    for CSVReader<T, P>
299{
300    type FeatureIterator<'a>
301        = CSVIterator<'a, T, P>
302    where
303        T: 'a,
304        P: 'a;
305
306    fn iter(&self) -> Self::FeatureIterator<'_> {
307        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
308        CSVIterator { reader: self }
309    }
310
311    #[cfg(feature = "std")]
312    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
313        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
314        self.par_seek(pool_size as u64, thread_id as u64);
315        CSVIterator { reader: self }
316    }
317}
318
319/// Parse CSV data into a record
320/// the source is the source of the CSV data
321/// the delimiter is the character used to separate fields
322/// the line_delimiter is the character used to separate lines
323/// returns an object with key-value pairs whose keys and values are both strings
324///
325/// Example:
326/// ```rust
327/// use gistools::readers::parse_csv_as_record;
328/// use s2json::MValue;
329///
330/// #[derive(Debug, Default, Clone, PartialEq, MValue)]
331/// struct Test {
332///     a: String,
333///     b: String,
334///     c: String,
335/// }
336/// let source = "a,b,c\n1,2,3\n4,5,6";
337/// let res = parse_csv_as_record::<Test>(source, None, None);
338///
339/// assert_eq!(
340///     res,
341///     vec![
342///         Test { a: "1".into(), b: "2".into(), c: "3".into() },
343///         Test { a: "4".into(), b: "5".into(), c: "6".into() },
344///     ]
345/// );
346/// ```
347pub fn parse_csv_as_record<T: MValueCompatible>(
348    source: &str,
349    delimiter: Option<char>,
350    line_delimiter: Option<char>,
351) -> Vec<T> {
352    let delimiter = delimiter.unwrap_or(',');
353    let line_delimiter = line_delimiter.unwrap_or('\n');
354    let mut res = vec![];
355    let lines: Vec<&str> = source.split(line_delimiter).collect();
356    let header = parse_csv_line(lines[0], delimiter);
357
358    for raw_line in lines.iter().skip(1) {
359        let line = raw_line.trim();
360        if line.is_empty() {
361            continue;
362        }
363
364        let mut record = MValue::new();
365        let values = parse_csv_line(line, delimiter);
366
367        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
368            let val =
369                if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
370            record.insert(header.into(), val);
371        }
372
373        res.push(record.into());
374    }
375
376    res
377}
378
379/// Parse CSV data into a BTreeMap record where both the keys and values are strings
380pub fn parse_csv_as_btree(
381    source: &str,
382    delimiter: Option<char>,
383    line_delimiter: Option<char>,
384) -> Vec<BTreeMap<String, String>> {
385    let delimiter = delimiter.unwrap_or(',');
386    let line_delimiter = line_delimiter.unwrap_or('\n');
387    let mut res = vec![];
388    let lines: Vec<&str> = source.split(line_delimiter).collect();
389    let header = parse_csv_line(lines[0], delimiter);
390
391    for raw_line in lines.iter().skip(1) {
392        let line = raw_line.trim();
393        if line.is_empty() {
394            continue;
395        }
396
397        let mut record = BTreeMap::new();
398        let values = parse_csv_line(line, delimiter);
399
400        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
401            let val = value.trim();
402            if val.is_empty() {
403                continue;
404            }
405            record.insert(header.clone(), val.to_string());
406        }
407
408        res.push(record);
409    }
410
411    res
412}
413
414/// Parses a line of a CSV file into a vector of values split by the delimiter.
415/// Handles quoted values that contain the delimiter.
416pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
417    let mut result = Vec::new();
418    let mut current = String::new();
419    let mut in_quotes = false;
420    let mut quote_char = None;
421
422    let chars: Vec<char> = line.chars().collect();
423    let mut i = 0;
424
425    while i < chars.len() {
426        let ch = chars[i];
427
428        if (ch == '"' || ch == '\'') && !in_quotes {
429            in_quotes = true;
430            quote_char = Some(ch);
431        } else if Some(ch) == quote_char && in_quotes {
432            // Check for escaped quote
433            if i + 1 < chars.len() && chars[i + 1] == ch {
434                current.push(ch);
435                i += 1; // Skip the next quote
436            } else {
437                in_quotes = false;
438            }
439        } else if ch == delimiter && !in_quotes {
440            result.push(current.trim().into());
441            current.clear();
442        } else {
443            current.push(ch);
444        }
445
446        i += 1;
447    }
448
449    // Push the final field
450    if !current.is_empty() {
451        result.push(current.trim().into());
452    }
453
454    result
455}
456
457// Helper function to align to delimiters if using parallel processing
458fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
459    let sep_u8 = sep as u8;
460
461    while pos < end {
462        let len = u64::min(65_536, end - pos);
463        let chunk = reader.parse_string(Some(pos), Some(len));
464        if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
465            return pos + rel as u64 + 1; // move past delimiter
466        }
467        pos += len;
468    }
469    end
470}