1use crate::parsers::{FeatureReader, Reader};
2use alloc::{
3 collections::{BTreeMap, VecDeque},
4 string::{String, ToString},
5 vec,
6 vec::Vec,
7};
8use core::{cell::RefCell, marker::PhantomData};
9use s2json::{
10 MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
11 VectorPoint,
12};
13use serde::de::DeserializeOwned;
14
15#[derive(Debug, Default)]
17pub struct CSVReaderOptions {
18 pub delimiter: Option<char>,
20 pub line_delimiter: Option<char>,
22 pub lon_key: Option<String>,
24 pub lat_key: Option<String>,
26 pub height_key: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31struct CSVParser {
32 first_line: bool,
33 fields: Vec<String>,
34 offset: u64,
35 end: u64,
36 partial_line: String,
37 parsed_lines: VecDeque<String>,
38}
39impl CSVParser {
40 pub fn new(end: u64) -> Self {
42 Self {
43 first_line: true,
44 fields: vec![],
45 offset: 0,
46 end,
47 partial_line: String::new(),
48 parsed_lines: VecDeque::new(),
49 }
50 }
51 pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
53 self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
54 }
55}
56
57#[derive(Debug, Clone)]
119pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
120 reader: T,
121 delimiter: char,
122 line_delimiter: char,
123 lon_key: String,
124 lat_key: String,
125 height_key: Option<String>,
126 parser: RefCell<CSVParser>,
127 _phantom: PhantomData<VectorFeature<(), P, ()>>,
128}
129impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
130 pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
139 let options = options.unwrap_or_default();
140 let len = reader.len();
141 CSVReader {
142 reader,
143 delimiter: options.delimiter.unwrap_or(','),
144 line_delimiter: options.line_delimiter.unwrap_or('\n'),
145 lon_key: options.lon_key.unwrap_or("lon".into()),
146 lat_key: options.lat_key.unwrap_or("lat".into()),
147 height_key: options.height_key,
148 parser: RefCell::new(CSVParser::new(len)),
149 _phantom: PhantomData,
150 }
151 }
152
153 pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
155 {
157 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
158 self.next_feature();
159 }
160 let len = self.reader.len();
162 let chunk_size = len.div_ceil(pool_size);
163 let mut start = thread_id.saturating_mul(chunk_size);
164 let mut end = u64::min(start + chunk_size, len);
165 if thread_id > 0 {
167 start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
168 }
169 if thread_id < pool_size - 1 {
170 end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
171 }
172 let mut parser = self.parser.borrow_mut();
174 if thread_id == 0 {
175 *parser = CSVParser::new(end);
176 } else {
177 parser.partial_line.clear();
178 parser.parsed_lines.clear();
179 }
180 parser.offset = start;
181 parser.end = end;
182 }
183
184 pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
186 let mut parser = self.parser.borrow_mut();
187 while let Some(line) = parser.parsed_lines.pop_front() {
189 let trimmed = line.trim();
190 if trimmed.is_empty() || trimmed.starts_with('#') {
191 continue;
192 }
193 if parser.first_line {
194 parser.parse_first_line(trimmed, self.delimiter);
195 parser.first_line = false;
196 } else {
197 return Some(self.parse_line(trimmed, &parser.fields));
198 }
199 }
200
201 if parser.offset < parser.end {
203 let length = u64::min(65_536, parser.end - parser.offset);
204 let chunk = parser.partial_line.clone()
205 + &self.reader.parse_string(Some(parser.offset), Some(length));
206 parser.offset += length;
207
208 parser.partial_line.clear();
209 let mut lines = chunk
210 .split(self.line_delimiter)
211 .map(str::to_string)
212 .filter(|s| !s.is_empty())
213 .collect::<Vec<_>>();
214
215 if let Some(last) = lines.pop() {
216 parser.partial_line = last;
217 }
218
219 parser.parsed_lines.extend(lines);
220 drop(parser);
222 return self.next_feature(); }
224
225 if !parser.partial_line.is_empty() {
227 let line = core::mem::take(&mut parser.partial_line);
228 let trimmed = line.trim();
229 if trimmed.is_empty() || trimmed.starts_with('#') {
230 return None;
231 }
232 if parser.first_line {
233 parser.parse_first_line(trimmed, self.delimiter);
234 parser.first_line = false;
235 drop(parser);
236 return self.next_feature(); } else {
238 return Some(self.parse_line(trimmed, &parser.fields));
239 }
240 }
241
242 None
243 }
244
245 fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
248 let values: Vec<String> =
249 line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
250 let mut properties = Properties::new();
251 let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
252
253 for (value, field) in values.iter().zip(fields.iter()) {
254 if field.is_empty() || value.is_empty() {
255 continue;
256 }
257
258 let value_num: f64 = value.parse().unwrap_or(0.0);
259 if *field == self.lon_key {
260 coordinates.x = value_num;
261 } else if *field == self.lat_key {
262 coordinates.y = value_num;
263 } else if Some(field) == self.height_key.as_ref() {
264 coordinates.z = Some(value_num);
265 } else {
266 properties.insert(field.clone(), value.into());
267 }
268 }
269 if coordinates.x.is_nan() || coordinates.y.is_nan() {
270 panic!("coordinates must be finite numbers");
271 }
272
273 VectorFeature {
274 _type: "VectorFeature".into(),
275 geometry: VectorGeometry::new_point(coordinates, None),
276 properties: (&properties).into(),
277 ..Default::default()
278 }
279 }
280}
281impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
282 type Item = VectorFeature<(), P, MValue>;
283 fn next(&mut self) -> Option<Self::Item> {
284 self.next_feature()
285 }
286}
287#[derive(Debug, Clone)]
289pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
290 reader: &'a CSVReader<T, P>,
291}
292impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
293 type Item = VectorFeature<(), P, MValue>;
294
295 fn next(&mut self) -> Option<Self::Item> {
296 self.reader.next_feature()
297 }
298}
299impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
301 for CSVReader<T, P>
302{
303 type FeatureIterator<'a>
304 = CSVIterator<'a, T, P>
305 where
306 T: 'a,
307 P: 'a;
308
309 fn iter(&self) -> Self::FeatureIterator<'_> {
310 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
311 CSVIterator { reader: self }
312 }
313
314 fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
315 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
316 self.par_seek(pool_size as u64, thread_id as u64);
317 CSVIterator { reader: self }
318 }
319}
320
321pub fn parse_csv_as_record<T: MValueCompatible>(
350 source: &str,
351 delimiter: Option<char>,
352 line_delimiter: Option<char>,
353) -> Vec<T> {
354 let delimiter = delimiter.unwrap_or(',');
355 let line_delimiter = line_delimiter.unwrap_or('\n');
356 let mut res = vec![];
357 let lines: Vec<&str> = source.split(line_delimiter).collect();
358 let header = parse_csv_line(lines[0], delimiter);
359
360 for raw_line in lines.iter().skip(1) {
361 let line = raw_line.trim();
362 if line.is_empty() {
363 continue;
364 }
365
366 let mut record = MValue::new();
367 let values = parse_csv_line(line, delimiter);
368
369 for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
370 let val =
371 if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
372 record.insert(header.into(), val);
373 }
374
375 res.push(record.into());
376 }
377
378 res
379}
380
381pub fn parse_csv_as_btree(
383 source: &str,
384 delimiter: Option<char>,
385 line_delimiter: Option<char>,
386) -> Vec<BTreeMap<String, String>> {
387 let delimiter = delimiter.unwrap_or(',');
388 let line_delimiter = line_delimiter.unwrap_or('\n');
389 let mut res = vec![];
390 let lines: Vec<&str> = source.split(line_delimiter).collect();
391 let header = parse_csv_line(lines[0], delimiter);
392
393 for raw_line in lines.iter().skip(1) {
394 let line = raw_line.trim();
395 if line.is_empty() {
396 continue;
397 }
398
399 let mut record = BTreeMap::new();
400 let values = parse_csv_line(line, delimiter);
401
402 for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
403 let val = value.trim();
404 if val.is_empty() {
405 continue;
406 }
407 record.insert(header.clone(), val.to_string());
408 }
409
410 res.push(record);
411 }
412
413 res
414}
415
416pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
419 let mut result = Vec::new();
420 let mut current = String::new();
421 let mut in_quotes = false;
422 let mut quote_char = None;
423
424 let chars: Vec<char> = line.chars().collect();
425 let mut i = 0;
426
427 while i < chars.len() {
428 let ch = chars[i];
429
430 if (ch == '"' || ch == '\'') && !in_quotes {
431 in_quotes = true;
432 quote_char = Some(ch);
433 } else if Some(ch) == quote_char && in_quotes {
434 if i + 1 < chars.len() && chars[i + 1] == ch {
436 current.push(ch);
437 i += 1; } else {
439 in_quotes = false;
440 }
441 } else if ch == delimiter && !in_quotes {
442 result.push(current.trim().into());
443 current.clear();
444 } else {
445 current.push(ch);
446 }
447
448 i += 1;
449 }
450
451 if !current.is_empty() {
453 result.push(current.trim().into());
454 }
455
456 result
457}
458
459fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
461 let sep_u8 = sep as u8;
462
463 while pos < end {
464 let len = u64::min(65_536, end - pos);
465 let chunk = reader.parse_string(Some(pos), Some(len));
466 if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
467 return pos + rel as u64 + 1; }
469 pos += len;
470 }
471 end
472}