gistools/readers/csv/
mod.rs

1use crate::parsers::{FeatureReader, Reader};
2use alloc::{
3    collections::{BTreeMap, VecDeque},
4    string::{String, ToString},
5    vec,
6    vec::Vec,
7};
8use core::{cell::RefCell, marker::PhantomData};
9use s2json::{
10    MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
11    VectorPoint,
12};
13use serde::de::DeserializeOwned;
14
15/// User defined options on how to parse the CSV file
16#[derive(Debug, Default)]
17pub struct CSVReaderOptions {
18    /// The delimiter to use to separate lines [Default: `","`]
19    pub delimiter: Option<char>,
20    /// The line_delimiter to use to separate lines [Default: `"\n"`]
21    pub line_delimiter: Option<char>,
22    /// If provided the lookup of the longitude [Default: `"lon"`]
23    pub lon_key: Option<String>,
24    /// If provided the lookup of the latitude [Default: `"lat"`]
25    pub lat_key: Option<String>,
26    /// If provided the lookup for the height value [Default: `None`]
27    pub height_key: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31struct CSVParser {
32    first_line: bool,
33    fields: Vec<String>,
34    offset: u64,
35    end: u64,
36    partial_line: String,
37    parsed_lines: VecDeque<String>,
38}
39impl CSVParser {
40    /// Create a new CSVParser
41    pub fn new(end: u64) -> Self {
42        Self {
43            first_line: true,
44            fields: vec![],
45            offset: 0,
46            end,
47            partial_line: String::new(),
48            parsed_lines: VecDeque::new(),
49        }
50    }
51    /// given the fields in the first line split by the delimiter and store them
52    pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
53        self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
54    }
55}
56
57/// # CSV Reader
58///
59/// ## Description
60/// Parse (Geo|S2)JSON from a file that is in the CSV format
61///
62/// Implements the [`FeatureReader`] trait
63///
64/// ## Usage
65///
66/// CSV Reader utilizes any struct that implements the [`Reader`] trait.
67/// Options are [`crate::parsers::BufferReader`], [`crate::parsers::FileReader`], and [`crate::parsers::MMapReader`].
68///
69/// ### File Reader
70/// ```rust
71/// use gistools::{
72///     parsers::{FeatureReader, FileReader},
73///     readers::{CSVReader, CSVReaderOptions},
74/// };
75/// use s2json::{MValue, VectorFeature};
76/// use serde::{Deserialize, Serialize};
77/// use std::path::PathBuf;
78///
79/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
80/// path.push("tests/readers/csv/fixtures/basic.csv");
81///
82/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
83/// struct Test {
84///     name: String,
85/// }
86///
87/// let reader = CSVReader::new(FileReader::from(path), None);
88/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
89/// ```
90///
91/// ### Buffer Reader
92/// ```rust
93/// use gistools::{
94///     parsers::{FeatureReader, BufferReader},
95///     readers::{CSVReader, CSVReaderOptions},
96/// };
97/// use s2json::{MValue, VectorFeature};
98/// use serde::{Deserialize, Serialize};
99///
100/// // Ignore this setup, just ensures a passing test.
101/// use std::path::PathBuf;
102/// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
103/// path.push("tests/readers/csv/fixtures/basic.csv");
104/// let data = std::fs::read(path).unwrap();
105///
106/// #[derive(Debug, Default, Clone, PartialEq, MValue, Serialize, Deserialize)]
107/// struct Test {
108///     name: String,
109/// }
110///
111/// let reader = CSVReader::new(BufferReader::from(data), None);
112/// let features: Vec<VectorFeature<(), Test, MValue>> = reader.iter().collect();
113/// ```
114///
115/// ## Links
116/// - <https://en.wikipedia.org/wiki/Comma-separated_values>
117/// - <https://cesium.com/blog/2015/04/07/quadtree-cheatseet/>
118#[derive(Debug, Clone)]
119pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
120    reader: T,
121    delimiter: char,
122    line_delimiter: char,
123    lon_key: String,
124    lat_key: String,
125    height_key: Option<String>,
126    parser: RefCell<CSVParser>,
127    _phantom: PhantomData<VectorFeature<(), P, ()>>,
128}
129impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
130    /// Create a new CSVReader
131    ///
132    /// ## Parameters
133    /// - `input`: the input data to parse from
134    /// - `options`: user defined options on how to parse the CSV file
135    ///
136    /// ## Returns
137    /// A new [`CSVReader`]
138    pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
139        let options = options.unwrap_or_default();
140        let len = reader.len();
141        CSVReader {
142            reader,
143            delimiter: options.delimiter.unwrap_or(','),
144            line_delimiter: options.line_delimiter.unwrap_or('\n'),
145            lon_key: options.lon_key.unwrap_or("lon".into()),
146            lat_key: options.lat_key.unwrap_or("lat".into()),
147            height_key: options.height_key,
148            parser: RefCell::new(CSVParser::new(len)),
149            _phantom: PhantomData,
150        }
151    }
152
153    /// Set a new position in the file for parallel processing
154    pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
155        // FIRST we run next_feature just to set the contents of the parser (specifically the fields)
156        {
157            *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
158            self.next_feature();
159        }
160        // setup chunk size, start and end
161        let len = self.reader.len();
162        let chunk_size = len.div_ceil(pool_size);
163        let mut start = thread_id.saturating_mul(chunk_size);
164        let mut end = u64::min(start + chunk_size, len);
165        // align to separators
166        if thread_id > 0 {
167            start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
168        }
169        if thread_id < pool_size - 1 {
170            end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
171        }
172        // update parser
173        let mut parser = self.parser.borrow_mut();
174        if thread_id == 0 {
175            *parser = CSVParser::new(end);
176        } else {
177            parser.partial_line.clear();
178            parser.parsed_lines.clear();
179        }
180        parser.offset = start;
181        parser.end = end;
182    }
183
184    /// Grab the next feature if it exists
185    pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
186        let mut parser = self.parser.borrow_mut();
187        // Keep returning from the queue if there's data
188        while let Some(line) = parser.parsed_lines.pop_front() {
189            let trimmed = line.trim();
190            if trimmed.is_empty() || trimmed.starts_with('#') {
191                continue;
192            }
193            if parser.first_line {
194                parser.parse_first_line(trimmed, self.delimiter);
195                parser.first_line = false;
196            } else {
197                return Some(self.parse_line(trimmed, &parser.fields));
198            }
199        }
200
201        // Read more if we're not done
202        if parser.offset < parser.end {
203            let length = u64::min(65_536, parser.end - parser.offset);
204            let chunk = parser.partial_line.clone()
205                + &self.reader.parse_string(Some(parser.offset), Some(length));
206            parser.offset += length;
207
208            parser.partial_line.clear();
209            let mut lines = chunk
210                .split(self.line_delimiter)
211                .map(str::to_string)
212                .filter(|s| !s.is_empty())
213                .collect::<Vec<_>>();
214
215            if let Some(last) = lines.pop() {
216                parser.partial_line = last;
217            }
218
219            parser.parsed_lines.extend(lines);
220            // drop the parser before calling next_feature
221            drop(parser);
222            return self.next_feature(); // recurse now that buffer is filled
223        }
224
225        // Final line after file ends
226        if !parser.partial_line.is_empty() {
227            let line = core::mem::take(&mut parser.partial_line);
228            let trimmed = line.trim();
229            if trimmed.is_empty() || trimmed.starts_with('#') {
230                return None;
231            }
232            if parser.first_line {
233                parser.parse_first_line(trimmed, self.delimiter);
234                parser.first_line = false;
235                drop(parser);
236                return self.next_feature(); // recurse again to skip header
237            } else {
238                return Some(self.parse_line(trimmed, &parser.fields));
239            }
240        }
241
242        None
243    }
244
245    /// given a line, parse the values mapped to the first lines fields
246    /// returns a GeoJSON Vector Feature
247    fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
248        let values: Vec<String> =
249            line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
250        let mut properties = Properties::new();
251        let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
252
253        for (value, field) in values.iter().zip(fields.iter()) {
254            if field.is_empty() || value.is_empty() {
255                continue;
256            }
257
258            let value_num: f64 = value.parse().unwrap_or(0.0);
259            if *field == self.lon_key {
260                coordinates.x = value_num;
261            } else if *field == self.lat_key {
262                coordinates.y = value_num;
263            } else if Some(field) == self.height_key.as_ref() {
264                coordinates.z = Some(value_num);
265            } else {
266                properties.insert(field.clone(), value.into());
267            }
268        }
269        if coordinates.x.is_nan() || coordinates.y.is_nan() {
270            panic!("coordinates must be finite numbers");
271        }
272
273        VectorFeature {
274            _type: "VectorFeature".into(),
275            geometry: VectorGeometry::new_point(coordinates, None),
276            properties: (&properties).into(),
277            ..Default::default()
278        }
279    }
280}
281impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
282    type Item = VectorFeature<(), P, MValue>;
283    fn next(&mut self) -> Option<Self::Item> {
284        self.next_feature()
285    }
286}
287/// The CSV Iterator tool
288#[derive(Debug, Clone)]
289pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
290    reader: &'a CSVReader<T, P>,
291}
292impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
293    type Item = VectorFeature<(), P, MValue>;
294
295    fn next(&mut self) -> Option<Self::Item> {
296        self.reader.next_feature()
297    }
298}
299/// A feature reader trait with a callback-based approach
300impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
301    for CSVReader<T, P>
302{
303    type FeatureIterator<'a>
304        = CSVIterator<'a, T, P>
305    where
306        T: 'a,
307        P: 'a;
308
309    fn iter(&self) -> Self::FeatureIterator<'_> {
310        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
311        CSVIterator { reader: self }
312    }
313
314    fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
315        *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
316        self.par_seek(pool_size as u64, thread_id as u64);
317        CSVIterator { reader: self }
318    }
319}
320
321/// Parse CSV data into a record
322/// the source is the source of the CSV data
323/// the delimiter is the character used to separate fields
324/// the line_delimiter is the character used to separate lines
325/// returns an object with key-value pairs whose keys and values are both strings
326///
327/// Example:
328/// ```rust
329/// use gistools::readers::parse_csv_as_record;
330/// use s2json::MValue;
331///
332/// #[derive(Debug, Default, Clone, PartialEq, MValue)]
333/// struct Test {
334///     a: String,
335///     b: String,
336///     c: String,
337/// }
338/// let source = "a,b,c\n1,2,3\n4,5,6";
339/// let res = parse_csv_as_record::<Test>(source, None, None);
340///
341/// assert_eq!(
342///     res,
343///     vec![
344///         Test { a: "1".into(), b: "2".into(), c: "3".into() },
345///         Test { a: "4".into(), b: "5".into(), c: "6".into() },
346///     ]
347/// );
348/// ```
349pub fn parse_csv_as_record<T: MValueCompatible>(
350    source: &str,
351    delimiter: Option<char>,
352    line_delimiter: Option<char>,
353) -> Vec<T> {
354    let delimiter = delimiter.unwrap_or(',');
355    let line_delimiter = line_delimiter.unwrap_or('\n');
356    let mut res = vec![];
357    let lines: Vec<&str> = source.split(line_delimiter).collect();
358    let header = parse_csv_line(lines[0], delimiter);
359
360    for raw_line in lines.iter().skip(1) {
361        let line = raw_line.trim();
362        if line.is_empty() {
363            continue;
364        }
365
366        let mut record = MValue::new();
367        let values = parse_csv_line(line, delimiter);
368
369        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
370            let val =
371                if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
372            record.insert(header.into(), val);
373        }
374
375        res.push(record.into());
376    }
377
378    res
379}
380
381/// Parse CSV data into a BTreeMap record where both the keys and values are strings
382pub fn parse_csv_as_btree(
383    source: &str,
384    delimiter: Option<char>,
385    line_delimiter: Option<char>,
386) -> Vec<BTreeMap<String, String>> {
387    let delimiter = delimiter.unwrap_or(',');
388    let line_delimiter = line_delimiter.unwrap_or('\n');
389    let mut res = vec![];
390    let lines: Vec<&str> = source.split(line_delimiter).collect();
391    let header = parse_csv_line(lines[0], delimiter);
392
393    for raw_line in lines.iter().skip(1) {
394        let line = raw_line.trim();
395        if line.is_empty() {
396            continue;
397        }
398
399        let mut record = BTreeMap::new();
400        let values = parse_csv_line(line, delimiter);
401
402        for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
403            let val = value.trim();
404            if val.is_empty() {
405                continue;
406            }
407            record.insert(header.clone(), val.to_string());
408        }
409
410        res.push(record);
411    }
412
413    res
414}
415
416/// Parses a line of a CSV file into a vector of values split by the delimiter.
417/// Handles quoted values that contain the delimiter.
418pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
419    let mut result = Vec::new();
420    let mut current = String::new();
421    let mut in_quotes = false;
422    let mut quote_char = None;
423
424    let chars: Vec<char> = line.chars().collect();
425    let mut i = 0;
426
427    while i < chars.len() {
428        let ch = chars[i];
429
430        if (ch == '"' || ch == '\'') && !in_quotes {
431            in_quotes = true;
432            quote_char = Some(ch);
433        } else if Some(ch) == quote_char && in_quotes {
434            // Check for escaped quote
435            if i + 1 < chars.len() && chars[i + 1] == ch {
436                current.push(ch);
437                i += 1; // Skip the next quote
438            } else {
439                in_quotes = false;
440            }
441        } else if ch == delimiter && !in_quotes {
442            result.push(current.trim().into());
443            current.clear();
444        } else {
445            current.push(ch);
446        }
447
448        i += 1;
449    }
450
451    // Push the final field
452    if !current.is_empty() {
453        result.push(current.trim().into());
454    }
455
456    result
457}
458
459// Helper function to align to delimiters if using parallel processing
460fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
461    let sep_u8 = sep as u8;
462
463    while pos < end {
464        let len = u64::min(65_536, end - pos);
465        let chunk = reader.parse_string(Some(pos), Some(len));
466        if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
467            return pos + rel as u64 + 1; // move past delimiter
468        }
469        pos += len;
470    }
471    end
472}