use crate::parsers::{FeatureReader, Reader};
use alloc::{
collections::{BTreeMap, VecDeque},
string::{String, ToString},
vec,
vec::Vec,
};
use core::{cell::RefCell, marker::PhantomData};
use s2json::{
MValue, MValueCompatible, PrimitiveValue, Properties, VectorFeature, VectorGeometry,
VectorPoint,
};
use serde::de::DeserializeOwned;
#[derive(Debug, Default)]
pub struct CSVReaderOptions {
pub delimiter: Option<char>,
pub line_delimiter: Option<char>,
pub lon_key: Option<String>,
pub lat_key: Option<String>,
pub height_key: Option<String>,
}
#[derive(Debug, Clone)]
struct CSVParser {
first_line: bool,
fields: Vec<String>,
offset: u64,
end: u64,
partial_line: String,
parsed_lines: VecDeque<String>,
}
impl CSVParser {
pub fn new(end: u64) -> Self {
Self {
first_line: true,
fields: vec![],
offset: 0,
end,
partial_line: String::new(),
parsed_lines: VecDeque::new(),
}
}
pub fn parse_first_line(&mut self, line: &str, delimiter: char) {
self.fields = line.split(delimiter).map(|v| v.trim().to_string()).collect();
}
}
#[derive(Debug, Clone)]
pub struct CSVReader<T: Reader, P: MValueCompatible + DeserializeOwned = MValue> {
reader: T,
delimiter: char,
line_delimiter: char,
lon_key: String,
lat_key: String,
height_key: Option<String>,
parser: RefCell<CSVParser>,
_phantom: PhantomData<VectorFeature<(), P, ()>>,
}
impl<T: Reader, P: MValueCompatible + DeserializeOwned> CSVReader<T, P> {
pub fn new(reader: T, options: Option<CSVReaderOptions>) -> CSVReader<T, P> {
let options = options.unwrap_or_default();
let len = reader.len();
CSVReader {
reader,
delimiter: options.delimiter.unwrap_or(','),
line_delimiter: options.line_delimiter.unwrap_or('\n'),
lon_key: options.lon_key.unwrap_or("lon".into()),
lat_key: options.lat_key.unwrap_or("lat".into()),
height_key: options.height_key,
parser: RefCell::new(CSVParser::new(len)),
_phantom: PhantomData,
}
}
pub fn par_seek(&self, pool_size: u64, thread_id: u64) {
{
*self.parser.borrow_mut() = CSVParser::new(self.reader.len());
self.next_feature();
}
let len = self.reader.len();
let chunk_size = len.div_ceil(pool_size);
let mut start = thread_id.saturating_mul(chunk_size);
let mut end = u64::min(start + chunk_size, len);
if thread_id > 0 {
start = align_to_line_delimiter(&self.reader, start, end, self.line_delimiter);
}
if thread_id < pool_size - 1 {
end = align_to_line_delimiter(&self.reader, end, len, self.line_delimiter);
}
let mut parser = self.parser.borrow_mut();
if thread_id == 0 {
*parser = CSVParser::new(end);
} else {
parser.partial_line.clear();
parser.parsed_lines.clear();
}
parser.offset = start;
parser.end = end;
}
pub fn next_feature(&self) -> Option<VectorFeature<(), P, MValue>> {
let mut parser = self.parser.borrow_mut();
while let Some(line) = parser.parsed_lines.pop_front() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
if parser.first_line {
parser.parse_first_line(trimmed, self.delimiter);
parser.first_line = false;
} else {
return Some(self.parse_line(trimmed, &parser.fields));
}
}
if parser.offset < parser.end {
let length = u64::min(65_536, parser.end - parser.offset);
let chunk = parser.partial_line.clone()
+ &self.reader.parse_string(Some(parser.offset), Some(length));
parser.offset += length;
parser.partial_line.clear();
let mut lines = chunk
.split(self.line_delimiter)
.map(str::to_string)
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
if let Some(last) = lines.pop() {
parser.partial_line = last;
}
parser.parsed_lines.extend(lines);
drop(parser);
return self.next_feature(); }
if !parser.partial_line.is_empty() {
let line = core::mem::take(&mut parser.partial_line);
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
return None;
}
if parser.first_line {
parser.parse_first_line(trimmed, self.delimiter);
parser.first_line = false;
drop(parser);
return self.next_feature(); } else {
return Some(self.parse_line(trimmed, &parser.fields));
}
}
None
}
fn parse_line(&self, line: &str, fields: &[String]) -> VectorFeature<(), P, MValue> {
let values: Vec<String> =
line.split(self.delimiter).map(|v| v.trim().to_string()).collect();
let mut properties = Properties::new();
let mut coordinates: VectorPoint<MValue> = VectorPoint::default();
for (value, field) in values.iter().zip(fields.iter()) {
if field.is_empty() || value.is_empty() {
continue;
}
let value_num: f64 = value.parse().unwrap_or(0.0);
if *field == self.lon_key {
coordinates.x = value_num;
} else if *field == self.lat_key {
coordinates.y = value_num;
} else if Some(field) == self.height_key.as_ref() {
coordinates.z = Some(value_num);
} else {
properties.insert(field.clone(), value.into());
}
}
if coordinates.x.is_nan() || coordinates.y.is_nan() {
panic!("coordinates must be finite numbers");
}
VectorFeature {
_type: "VectorFeature".into(),
geometry: VectorGeometry::new_point(coordinates, None),
properties: (&properties).into(),
..Default::default()
}
}
}
impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVReader<T, P> {
type Item = VectorFeature<(), P, MValue>;
fn next(&mut self) -> Option<Self::Item> {
self.next_feature()
}
}
#[derive(Debug, Clone)]
pub struct CSVIterator<'a, T: Reader, P: MValueCompatible + DeserializeOwned> {
reader: &'a CSVReader<T, P>,
}
impl<T: Reader, P: MValueCompatible + DeserializeOwned> Iterator for CSVIterator<'_, T, P> {
type Item = VectorFeature<(), P, MValue>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.next_feature()
}
}
impl<T: Reader, P: MValueCompatible + DeserializeOwned> FeatureReader<(), P, MValue>
for CSVReader<T, P>
{
type FeatureIterator<'a>
= CSVIterator<'a, T, P>
where
T: 'a,
P: 'a;
fn iter(&self) -> Self::FeatureIterator<'_> {
*self.parser.borrow_mut() = CSVParser::new(self.reader.len());
CSVIterator { reader: self }
}
fn par_iter(&self, pool_size: usize, thread_id: usize) -> Self::FeatureIterator<'_> {
*self.parser.borrow_mut() = CSVParser::new(self.reader.len());
self.par_seek(pool_size as u64, thread_id as u64);
CSVIterator { reader: self }
}
}
pub fn parse_csv_as_record<T: MValueCompatible>(
source: &str,
delimiter: Option<char>,
line_delimiter: Option<char>,
) -> Vec<T> {
let delimiter = delimiter.unwrap_or(',');
let line_delimiter = line_delimiter.unwrap_or('\n');
let mut res = vec![];
let lines: Vec<&str> = source.split(line_delimiter).collect();
let header = parse_csv_line(lines[0], delimiter);
for raw_line in lines.iter().skip(1) {
let line = raw_line.trim();
if line.is_empty() {
continue;
}
let mut record = MValue::new();
let values = parse_csv_line(line, delimiter);
for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
let val =
if value.trim().is_empty() { (&PrimitiveValue::Null).into() } else { value.into() };
record.insert(header.into(), val);
}
res.push(record.into());
}
res
}
pub fn parse_csv_as_btree(
source: &str,
delimiter: Option<char>,
line_delimiter: Option<char>,
) -> Vec<BTreeMap<String, String>> {
let delimiter = delimiter.unwrap_or(',');
let line_delimiter = line_delimiter.unwrap_or('\n');
let mut res = vec![];
let lines: Vec<&str> = source.split(line_delimiter).collect();
let header = parse_csv_line(lines[0], delimiter);
for raw_line in lines.iter().skip(1) {
let line = raw_line.trim();
if line.is_empty() {
continue;
}
let mut record = BTreeMap::new();
let values = parse_csv_line(line, delimiter);
for (value, header) in values.iter().take(header.len()).zip(header.iter()) {
let val = value.trim();
if val.is_empty() {
continue;
}
record.insert(header.clone(), val.to_string());
}
res.push(record);
}
res
}
pub fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
let mut result = Vec::new();
let mut current = String::new();
let mut in_quotes = false;
let mut quote_char = None;
let chars: Vec<char> = line.chars().collect();
let mut i = 0;
while i < chars.len() {
let ch = chars[i];
if (ch == '"' || ch == '\'') && !in_quotes {
in_quotes = true;
quote_char = Some(ch);
} else if Some(ch) == quote_char && in_quotes {
if i + 1 < chars.len() && chars[i + 1] == ch {
current.push(ch);
i += 1; } else {
in_quotes = false;
}
} else if ch == delimiter && !in_quotes {
result.push(current.trim().into());
current.clear();
} else {
current.push(ch);
}
i += 1;
}
if !current.is_empty() {
result.push(current.trim().into());
}
result
}
fn align_to_line_delimiter<R: Reader>(reader: &R, mut pos: u64, end: u64, sep: char) -> u64 {
let sep_u8 = sep as u8;
while pos < end {
let len = u64::min(65_536, end - pos);
let chunk = reader.parse_string(Some(pos), Some(len));
if let Some(rel) = chunk.as_bytes().iter().position(|&b| b == sep_u8) {
return pos + rel as u64 + 1; }
pos += len;
}
end
}