clickhouse_format/output/
json_each_row_with_progress.rs

1use core::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    io::{BufRead as _, Error as IoError},
5};
6
7use serde::{de::DeserializeOwned, Deserialize};
8use serde_aux::field_attributes::deserialize_number_from_string;
9use serde_json::Value;
10
11use crate::format_name::FormatName;
12
13use super::{Output, OutputResult};
14
15pub struct JsonEachRowWithProgressOutput<T> {
16    phantom: PhantomData<T>,
17}
18impl<T> Default for JsonEachRowWithProgressOutput<T> {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23impl<T> JsonEachRowWithProgressOutput<T> {
24    pub fn new() -> Self {
25        Self {
26            phantom: PhantomData,
27        }
28    }
29}
30
31pub type GeneralJsonEachRowWithProgressOutput =
32    JsonEachRowWithProgressOutput<HashMap<String, Value>>;
33
34#[derive(thiserror::Error, Debug)]
35pub enum JsonEachRowWithProgressOutputError {
36    #[error("IoError {0:?}")]
37    IoError(#[from] IoError),
38    #[error("SerdeJsonError {0:?}")]
39    SerdeJsonError(#[from] serde_json::Error),
40    #[error("ProgressInTheWrongPosition")]
41    ProgressInTheWrongPosition,
42    #[error("ProgressMissing")]
43    ProgressMissing,
44}
45
46impl<T> Output for JsonEachRowWithProgressOutput<T>
47where
48    T: DeserializeOwned,
49{
50    type Row = T;
51    type Info = JsonEachRowProgress;
52
53    type Error = JsonEachRowWithProgressOutputError;
54
55    fn format_name() -> FormatName {
56        FormatName::JsonEachRowWithProgress
57    }
58
59    fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
60        let mut data: Vec<T> = vec![];
61        let mut info = Option::<JsonEachRowProgress>::None;
62
63        for line in slice.lines() {
64            let line = line?;
65
66            if info.is_some() {
67                return Err(JsonEachRowWithProgressOutputError::ProgressInTheWrongPosition);
68            }
69
70            match serde_json::from_str::<JsonEachRowLine<T>>(&line)? {
71                JsonEachRowLine::Row { row } => {
72                    data.push(row);
73                    continue;
74                }
75                JsonEachRowLine::Progress { progress } => {
76                    info = Some(progress);
77                    break;
78                }
79            }
80        }
81
82        let info = info.ok_or(JsonEachRowWithProgressOutputError::ProgressMissing)?;
83
84        Ok((data, info))
85    }
86}
87
88#[derive(Deserialize, Debug, Clone)]
89#[serde(untagged)]
90enum JsonEachRowLine<T>
91where
92    T: Sized,
93{
94    Row { row: T },
95    Progress { progress: JsonEachRowProgress },
96}
97
98#[derive(Deserialize, Debug, Clone)]
99pub struct JsonEachRowProgress {
100    #[serde(deserialize_with = "deserialize_number_from_string")]
101    pub read_rows: usize,
102    #[serde(deserialize_with = "deserialize_number_from_string")]
103    pub read_bytes: usize,
104    #[serde(deserialize_with = "deserialize_number_from_string")]
105    pub written_rows: usize,
106    #[serde(deserialize_with = "deserialize_number_from_string")]
107    pub written_bytes: usize,
108    #[serde(deserialize_with = "deserialize_number_from_string")]
109    pub total_rows_to_read: usize,
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    use std::{fs, path::PathBuf};
117
118    use crate::test_helpers::{TestRow, TEST_ROW_1};
119
120    #[test]
121    fn simple() -> Result<(), Box<dyn std::error::Error>> {
122        let file_path = PathBuf::new().join("tests/files/JSONEachRowWithProgress.txt");
123        let content = fs::read_to_string(&file_path)?;
124
125        assert_eq!(
126            GeneralJsonEachRowWithProgressOutput::format_name(),
127            file_path
128                .file_stem()
129                .unwrap()
130                .to_string_lossy()
131                .parse()
132                .unwrap()
133        );
134
135        let (rows, info) =
136            GeneralJsonEachRowWithProgressOutput::new().deserialize(content.as_bytes())?;
137        assert_eq!(
138            rows.first().unwrap().get("tuple1").unwrap(),
139            &Value::Array(vec![1.into(), "a".into()])
140        );
141        assert_eq!(info.read_rows, 2);
142
143        let (rows, info) =
144            JsonEachRowWithProgressOutput::<TestRow>::new().deserialize(content.as_bytes())?;
145        assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
146        assert_eq!(info.read_rows, 2);
147
148        Ok(())
149    }
150}