clickhouse_format/output/
json_each_row_with_progress.rs1use core::marker::PhantomData;
2use std::{
3 collections::HashMap,
4 io::{BufRead as _, Error as IoError},
5};
6
7use serde::{de::DeserializeOwned, Deserialize};
8use serde_aux::field_attributes::deserialize_number_from_string;
9use serde_json::Value;
10
11use crate::format_name::FormatName;
12
13use super::{Output, OutputResult};
14
15pub struct JsonEachRowWithProgressOutput<T> {
16 phantom: PhantomData<T>,
17}
18impl<T> Default for JsonEachRowWithProgressOutput<T> {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23impl<T> JsonEachRowWithProgressOutput<T> {
24 pub fn new() -> Self {
25 Self {
26 phantom: PhantomData,
27 }
28 }
29}
30
31pub type GeneralJsonEachRowWithProgressOutput =
32 JsonEachRowWithProgressOutput<HashMap<String, Value>>;
33
34#[derive(thiserror::Error, Debug)]
35pub enum JsonEachRowWithProgressOutputError {
36 #[error("IoError {0:?}")]
37 IoError(#[from] IoError),
38 #[error("SerdeJsonError {0:?}")]
39 SerdeJsonError(#[from] serde_json::Error),
40 #[error("ProgressInTheWrongPosition")]
41 ProgressInTheWrongPosition,
42 #[error("ProgressMissing")]
43 ProgressMissing,
44}
45
46impl<T> Output for JsonEachRowWithProgressOutput<T>
47where
48 T: DeserializeOwned,
49{
50 type Row = T;
51 type Info = JsonEachRowProgress;
52
53 type Error = JsonEachRowWithProgressOutputError;
54
55 fn format_name() -> FormatName {
56 FormatName::JsonEachRowWithProgress
57 }
58
59 fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
60 let mut data: Vec<T> = vec![];
61 let mut info = Option::<JsonEachRowProgress>::None;
62
63 for line in slice.lines() {
64 let line = line?;
65
66 if info.is_some() {
67 return Err(JsonEachRowWithProgressOutputError::ProgressInTheWrongPosition);
68 }
69
70 match serde_json::from_str::<JsonEachRowLine<T>>(&line)? {
71 JsonEachRowLine::Row { row } => {
72 data.push(row);
73 continue;
74 }
75 JsonEachRowLine::Progress { progress } => {
76 info = Some(progress);
77 break;
78 }
79 }
80 }
81
82 let info = info.ok_or(JsonEachRowWithProgressOutputError::ProgressMissing)?;
83
84 Ok((data, info))
85 }
86}
87
88#[derive(Deserialize, Debug, Clone)]
89#[serde(untagged)]
90enum JsonEachRowLine<T>
91where
92 T: Sized,
93{
94 Row { row: T },
95 Progress { progress: JsonEachRowProgress },
96}
97
98#[derive(Deserialize, Debug, Clone)]
99pub struct JsonEachRowProgress {
100 #[serde(deserialize_with = "deserialize_number_from_string")]
101 pub read_rows: usize,
102 #[serde(deserialize_with = "deserialize_number_from_string")]
103 pub read_bytes: usize,
104 #[serde(deserialize_with = "deserialize_number_from_string")]
105 pub written_rows: usize,
106 #[serde(deserialize_with = "deserialize_number_from_string")]
107 pub written_bytes: usize,
108 #[serde(deserialize_with = "deserialize_number_from_string")]
109 pub total_rows_to_read: usize,
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 use std::{fs, path::PathBuf};
117
118 use crate::test_helpers::{TestRow, TEST_ROW_1};
119
120 #[test]
121 fn simple() -> Result<(), Box<dyn std::error::Error>> {
122 let file_path = PathBuf::new().join("tests/files/JSONEachRowWithProgress.txt");
123 let content = fs::read_to_string(&file_path)?;
124
125 assert_eq!(
126 GeneralJsonEachRowWithProgressOutput::format_name(),
127 file_path
128 .file_stem()
129 .unwrap()
130 .to_string_lossy()
131 .parse()
132 .unwrap()
133 );
134
135 let (rows, info) =
136 GeneralJsonEachRowWithProgressOutput::new().deserialize(content.as_bytes())?;
137 assert_eq!(
138 rows.first().unwrap().get("tuple1").unwrap(),
139 &Value::Array(vec![1.into(), "a".into()])
140 );
141 assert_eq!(info.read_rows, 2);
142
143 let (rows, info) =
144 JsonEachRowWithProgressOutput::<TestRow>::new().deserialize(content.as_bytes())?;
145 assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
146 assert_eq!(info.read_rows, 2);
147
148 Ok(())
149 }
150}