use core::marker::PhantomData;
use std::{
collections::HashMap,
io::{BufRead as _, Error as IoError},
};
use serde::{de::DeserializeOwned, Deserialize};
use serde_aux::field_attributes::deserialize_number_from_string;
use serde_json::Value;
use crate::format_name::FormatName;
use super::{Output, OutputResult};
pub struct JsonEachRowWithProgressOutput<T> {
phantom: PhantomData<T>,
}
impl<T> Default for JsonEachRowWithProgressOutput<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> JsonEachRowWithProgressOutput<T> {
pub fn new() -> Self {
Self {
phantom: PhantomData,
}
}
}
pub type GeneralJsonEachRowWithProgressOutput =
JsonEachRowWithProgressOutput<HashMap<String, Value>>;
#[derive(thiserror::Error, Debug)]
pub enum JsonEachRowWithProgressOutputError {
#[error("IoError {0:?}")]
IoError(#[from] IoError),
#[error("SerdeJsonError {0:?}")]
SerdeJsonError(#[from] serde_json::Error),
#[error("ProgressInTheWrongPosition")]
ProgressInTheWrongPosition,
#[error("ProgressMissing")]
ProgressMissing,
}
impl<T> Output for JsonEachRowWithProgressOutput<T>
where
T: DeserializeOwned,
{
type Row = T;
type Info = JsonEachRowProgress;
type Error = JsonEachRowWithProgressOutputError;
fn format_name() -> FormatName {
FormatName::JsonEachRowWithProgress
}
fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
let mut data: Vec<T> = vec![];
let mut info = Option::<JsonEachRowProgress>::None;
for line in slice.lines() {
let line = line?;
if info.is_some() {
return Err(JsonEachRowWithProgressOutputError::ProgressInTheWrongPosition);
}
match serde_json::from_str::<JsonEachRowLine<T>>(&line)? {
JsonEachRowLine::Row { row } => {
data.push(row);
continue;
}
JsonEachRowLine::Progress { progress } => {
info = Some(progress);
break;
}
}
}
let info = info.ok_or(JsonEachRowWithProgressOutputError::ProgressMissing)?;
Ok((data, info))
}
}
#[derive(Deserialize, Debug, Clone)]
#[serde(untagged)]
enum JsonEachRowLine<T>
where
T: Sized,
{
Row { row: T },
Progress { progress: JsonEachRowProgress },
}
#[derive(Deserialize, Debug, Clone)]
pub struct JsonEachRowProgress {
#[serde(deserialize_with = "deserialize_number_from_string")]
pub read_rows: usize,
#[serde(deserialize_with = "deserialize_number_from_string")]
pub read_bytes: usize,
#[serde(deserialize_with = "deserialize_number_from_string")]
pub written_rows: usize,
#[serde(deserialize_with = "deserialize_number_from_string")]
pub written_bytes: usize,
#[serde(deserialize_with = "deserialize_number_from_string")]
pub total_rows_to_read: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use std::{fs, path::PathBuf};
use crate::test_helpers::{TestRow, TEST_ROW_1};
#[test]
fn simple() -> Result<(), Box<dyn std::error::Error>> {
let file_path = PathBuf::new().join("tests/files/JSONEachRowWithProgress.txt");
let content = fs::read_to_string(&file_path)?;
assert_eq!(
GeneralJsonEachRowWithProgressOutput::format_name(),
file_path
.file_stem()
.unwrap()
.to_string_lossy()
.parse()
.unwrap()
);
let (rows, info) =
GeneralJsonEachRowWithProgressOutput::new().deserialize(content.as_bytes())?;
assert_eq!(
rows.first().unwrap().get("tuple1").unwrap(),
&Value::Array(vec![1.into(), "a".into()])
);
assert_eq!(info.read_rows, 2);
let (rows, info) =
JsonEachRowWithProgressOutput::<TestRow>::new().deserialize(content.as_bytes())?;
assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
assert_eq!(info.read_rows, 2);
Ok(())
}
}