use std::io::Read;
use serde::{Serialize, de::DeserializeOwned};
use super::FormatError;
pub(crate) fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, FormatError> {
let mut rdr = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(bytes);
let records: Vec<csv::StringRecord> = rdr
.records()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| FormatError::Serde(Box::new(e)))?;
let headers = rdr.headers().map_err(|e| FormatError::Serde(Box::new(e)))?;
let headers: Vec<&str> = headers.iter().collect();
let json_records: Vec<serde_json::Value> = records
.iter()
.map(|record| {
let mut obj = serde_json::Map::new();
for (i, field) in record.iter().enumerate() {
if let Some(header) = headers.get(i) {
let v_str = field.to_string();
let value = if let Ok(n) = v_str.parse::<i64>() {
serde_json::Value::Number(serde_json::Number::from(n))
} else {
serde_json::Value::String(v_str)
};
obj.insert((*header).to_string(), value);
}
}
serde_json::Value::Object(obj)
})
.collect();
let json_value = serde_json::Value::Array(json_records);
serde_json::from_value(json_value).map_err(|e| FormatError::Serde(Box::new(e)))
}
pub(crate) struct CsvRecordStream<R, T> {
iter: csv::DeserializeRecordsIntoIter<R, T>,
}
impl<R, T> Iterator for CsvRecordStream<R, T>
where
R: Read,
T: DeserializeOwned,
{
type Item = Result<T, FormatError>;
fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()
.map(|res| res.map_err(|e| FormatError::Serde(Box::new(e))))
}
}
pub(crate) fn stream_deserialize<T, R>(reader: R) -> CsvRecordStream<R, T>
where
T: DeserializeOwned,
R: Read,
{
let rdr = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(reader);
CsvRecordStream {
iter: rdr.into_deserialize(),
}
}
pub(crate) fn serialize<T: Serialize>(value: &T) -> Result<Vec<u8>, FormatError> {
let json_value = serde_json::to_value(value).map_err(|e| FormatError::Serde(Box::new(e)))?;
let mut wtr = csv::Writer::from_writer(Vec::new());
match json_value {
serde_json::Value::Array(arr) => {
if let Some(first) = arr.first() {
if let serde_json::Value::Object(obj) = first {
let headers: Vec<&str> = obj.keys().map(|s| s.as_str()).collect();
wtr.write_record(&headers)
.map_err(|e| FormatError::Serde(Box::new(e)))?;
} else {
return Err(FormatError::Other(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"CSV format requires array of objects or single object",
))));
}
}
for item in arr {
let obj = match item {
serde_json::Value::Object(obj) => obj,
_ => {
return Err(FormatError::Other(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"CSV format requires array of objects or single object",
))));
}
};
let record: Vec<String> = obj
.values()
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
_ => v.to_string(),
})
.collect();
wtr.write_record(&record)
.map_err(|e| FormatError::Serde(Box::new(e)))?;
}
}
serde_json::Value::Object(obj) => {
let headers: Vec<&str> = obj.keys().map(|s| s.as_str()).collect();
wtr.write_record(&headers)
.map_err(|e| FormatError::Serde(Box::new(e)))?;
let record: Vec<String> = obj
.values()
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
_ => v.to_string(),
})
.collect();
wtr.write_record(&record)
.map_err(|e| FormatError::Serde(Box::new(e)))?;
}
_ => {
return Err(FormatError::Other(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"CSV format requires an array or object",
))));
}
}
wtr.into_inner()
.map_err(|e| FormatError::Other(Box::new(e)))
}