1use crate::parsers::{FeatureReader, Reader};
2use alloc::{
3 collections::{BTreeMap, VecDeque},
4 string::{String, ToString},
5 vec,
6 vec::Vec,
7};
8use core::{cell::RefCell, marker::PhantomData};
9use s2json::{
10 MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
11 VectorPoint,
12};
13use serde::de::DeserializeOwned;
14
15#[derive(Debug, Default)]
17pub struct CSVReaderOptions {
18 pub delimiter: Option<char>,
20 pub line_delimiter: Option<char>,
22 pub lon_key: Option<String>,
24 pub lat_key: Option<String>,
26 pub height_key: Option<String>,
28}
29
30#[derive(Debug, Clone)]
31struct CSVParser {
32 first_line: bool,
33 fields: Vec<String>,
34 offset: u64,
35 end: u64,
36 partial_line: String,
37 parsed_lines: VecDeque<String>,
38}
39impl CSVParser {
40 pub fn new(end: u64) -> Self {
42 Self {
43 first_line: true,
44 fields: vec![],
45 offset: 0,
46 end,
47 partial_line: String::new(),
48 parsed_lines: VecDeque::new(),
49 }
50 }
51 pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
53 self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
54 }
55}
56
57#[derive(Debug, Clone)]
116pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
117 reader: T,
118 delimiter: char,
119 line_delimiter: char,
120 lon_key: String,
121 lat_key: String,
122 height_key: Option<String>,
123 parser: RefCell<CSVParser>,
124 _phantom: PhantomData<VectorFeature<(), P, ()>>,
125}
126impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
127 pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
136 let options = options.unwrap_or_default();
137 let len = reader.len();
138 CSVReader {
139 reader,
140 delimiter: options.delimiter.unwrap_or(','),
141 line_delimiter: options.line_delimiter.unwrap_or('\n'),
142 lon_key: options.lon_key.unwrap_or("lon".into()),
143 lat_key: options.lat_key.unwrap_or("lat".into()),
144 height_key: options.height_key,
145 parser: RefCell::new(CSVParser::new(len)),
146 _phantom: PhantomData,
147 }
148 }
149
150 pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
152 {
154 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
155 self.next_feature();
156 }
157 let len = self.reader.len();
159 let chunk_size = len.div_ceil(pool_size);
160 let mut start = thread_id.saturating_mul(chunk_size);
161 let mut end = u64::min(start + chunk_size, len);
162 if thread_id > 0 {
164 start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
165 }
166 if thread_id < pool_size - 1 {
167 end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
168 }
169 let mut parser = self.parser.borrow_mut();
171 if thread_id == 0 {
172 *parser = CSVParser::new(end);
173 } else {
174 parser.partial_line.clear();
175 parser.parsed_lines.clear();
176 }
177 parser.offset = start;
178 parser.end = end;
179 }
180
181 pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
183 let mut parser = self.parser.borrow_mut();
184 while let Some(line) = parser.parsed_lines.pop_front() {
186 let trimmed = line.trim();
187 if trimmed.is_empty() || trimmed.starts_with('#') {
188 continue;
189 }
190 if parser.first_line {
191 parser.parse_first_line(trimmed, self.delimiter);
192 parser.first_line = false;
193 } else {
194 return Some(self.parse_line(trimmed, &parser.fields));
195 }
196 }
197
198 if parser.offset < parser.end {
200 let length = u64::min(65_536, parser.end - parser.offset);
201 let chunk = parser.partial_line.clone()
202 + &self.reader.parse_string(Some(parser.offset), Some(length));
203 parser.offset += length;
204
205 parser.partial_line.clear();
206 let mut lines = chunk
207 .split(self.line_delimiter)
208 .map(str::to_string)
209 .filter(|s| !s.is_empty())
210 .collect::<Vec<_>>();
211
212 if let Some(last) = lines.pop() {
213 parser.partial_line = last;
214 }
215
216 parser.parsed_lines.extend(lines);
217 drop(parser);
219 return self.next_feature(); }
221
222 if !parser.partial_line.is_empty() {
224 let line = core::mem::take(&mut parser.partial_line);
225 let trimmed = line.trim();
226 if trimmed.is_empty() || trimmed.starts_with('#') {
227 return None;
228 }
229 if parser.first_line {
230 parser.parse_first_line(trimmed, self.delimiter);
231 parser.first_line = false;
232 drop(parser);
233 return self.next_feature(); } else {
235 return Some(self.parse_line(trimmed, &parser.fields));
236 }
237 }
238
239 None
240 }
241
242 fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
245 let values: Vec<String> =
246 line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
247 let mut properties = Properties::new();
248 let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
249
250 for (value, field) in values.iter().zip(fields.iter()) {
251 if field.is_empty() || value.is_empty() {
252 continue;
253 }
254
255 let value_num: f64 = value.parse().unwrap_or(0.0);
256 if *field == self.lon_key {
257 coordinates.x = value_num;
258 } else if *field == self.lat_key {
259 coordinates.y = value_num;
260 } else if Some(field) == self.height_key.as_ref() {
261 coordinates.z = Some(value_num);
262 } else {
263 properties.insert(field.clone(), value.into());
264 }
265 }
266 if coordinates.x.is_nan() || coordinates.y.is_nan() {
267 panic!("coordinates must be finite numbers");
268 }
269
270 VectorFeature {
271 _type: "VectorFeature".into(),
272 geometry: VectorGeometry::new_point(coordinates, None),
273 properties: (&properties).into(),
274 ..Default::default()
275 }
276 }
277}
278impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
279 type Item = VectorFeature<(), P, MValue>;
280 fn next(&mut self) -> Option<Self::Item> {
281 self.next_feature()
282 }
283}
284#[derive(Debug, Clone)]
286pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
287 reader: &'a CSVReader<T, P>,
288}
289impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
290 type Item = VectorFeature<(), P, MValue>;
291
292 fn next(&mut self) -> Option<Self::Item> {
293 self.reader.next_feature()
294 }
295}
296impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
298 for CSVReader<T, P>
299{
300 type FeatureIterator<'a>
301 = CSVIterator<'a, T, P>
302 where
303 T: 'a,
304 P: 'a;
305
306 fn iter(&self) -> Self::FeatureIterator<'_> {
307 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
308 CSVIterator { reader: self }
309 }
310
311 fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
312 *self.parser.borrow_mut() = CSVParser::new(self.reader.len());
313 self.par_seek(pool_size as u64, thread_id as u64);
314 CSVIterator { reader: self }
315 }
316}
317
318pub fn parse_csv_as_record<T: MValueCompatible>(
347 source: &str,
348 delimiter: Option<char>,
349 line_delimiter: Option<char>,
350) -> Vec<T> {
351 let delimiter = delimiter.unwrap_or(',');
352 let line_delimiter = line_delimiter.unwrap_or('\n');
353 let mut res = vec![];
354 let lines: Vec<&str> = source.split(line_delimiter).collect();
355 let header = parse_csv_line(lines[0], delimiter);
356
357 for raw_line in lines.iter().skip(1) {
358 let line = raw_line.trim();
359 if line.is_empty() {
360 continue;
361 }
362
363 let mut record = MValue::new();
364 let values = parse_csv_line(line, delimiter);
365
366 for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
367 let val =
368 if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
369 record.insert(header.into(), val);
370 }
371
372 res.push(record.into());
373 }
374
375 res
376}
377
378pub fn parse_csv_as_btree(
380 source: &str,
381 delimiter: Option<char>,
382 line_delimiter: Option<char>,
383) -> Vec<BTreeMap<String, String>> {
384 let delimiter = delimiter.unwrap_or(',');
385 let line_delimiter = line_delimiter.unwrap_or('\n');
386 let mut res = vec![];
387 let lines: Vec<&str> = source.split(line_delimiter).collect();
388 let header = parse_csv_line(lines[0], delimiter);
389
390 for raw_line in lines.iter().skip(1) {
391 let line = raw_line.trim();
392 if line.is_empty() {
393 continue;
394 }
395
396 let mut record = BTreeMap::new();
397 let values = parse_csv_line(line, delimiter);
398
399 for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
400 let val = value.trim();
401 if val.is_empty() {
402 continue;
403 }
404 record.insert(header.clone(), val.to_string());
405 }
406
407 res.push(record);
408 }
409
410 res
411}
412
413pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
416 let mut result = Vec::new();
417 let mut current = String::new();
418 let mut in_quotes = false;
419 let mut quote_char = None;
420
421 let chars: Vec<char> = line.chars().collect();
422 let mut i = 0;
423
424 while i < chars.len() {
425 let ch = chars[i];
426
427 if (ch == '"' || ch == '\'') && !in_quotes {
428 in_quotes = true;
429 quote_char = Some(ch);
430 } else if Some(ch) == quote_char && in_quotes {
431 if i + 1 < chars.len() && chars[i + 1] == ch {
433 current.push(ch);
434 i += 1; } else {
436 in_quotes = false;
437 }
438 } else if ch == delimiter && !in_quotes {
439 result.push(current.trim().into());
440 current.clear();
441 } else {
442 current.push(ch);
443 }
444
445 i += 1;
446 }
447
448 if !current.is_empty() {
450 result.push(current.trim().into());
451 }
452
453 result
454}
455
456fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
458 let sep_u8 = sep as u8;
459
460 while pos < end {
461 let len = u64::min(65_536, end - pos);
462 let chunk = reader.parse_string(Some(pos), Some(len));
463 if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
464 return pos + rel as u64 + 1; }
466 pos += len;
467 }
468 end
469}