clickhouse_format/output/
json_compact_each_row.rs

1use core::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    io::{BufRead as _, Error as IoError},
5};
6
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::format_name::FormatName;
11
12use super::{Output, OutputResult};
13
14pub struct JsonCompactEachRowOutput<T> {
15    names: Vec<String>,
16    phantom: PhantomData<T>,
17}
18impl<T> JsonCompactEachRowOutput<T> {
19    pub fn new(names: Vec<String>) -> Self {
20        Self {
21            names,
22            phantom: PhantomData,
23        }
24    }
25}
26
27pub type GeneralJsonCompactEachRowOutput = JsonCompactEachRowOutput<HashMap<String, Value>>;
28
29#[derive(thiserror::Error, Debug)]
30pub enum JsonCompactEachRowOutputError {
31    #[error("IoError {0:?}")]
32    IoError(#[from] IoError),
33    #[error("SerdeJsonError {0:?}")]
34    SerdeJsonError(#[from] serde_json::Error),
35}
36
37impl<T> Output for JsonCompactEachRowOutput<T>
38where
39    T: DeserializeOwned,
40{
41    type Row = T;
42    type Info = ();
43
44    type Error = JsonCompactEachRowOutputError;
45
46    fn format_name() -> FormatName {
47        FormatName::JsonCompactEachRow
48    }
49
50    fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
51        let mut data: Vec<T> = vec![];
52
53        for line in slice.lines() {
54            let line = line?;
55            let values: Vec<Value> = serde_json::from_str(&line)?;
56
57            let row: T = serde_json::from_value(Value::Object(
58                self.names.iter().cloned().zip(values).collect(),
59            ))?;
60
61            data.push(row);
62        }
63
64        Ok((data, ()))
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71
72    use std::{fs, path::PathBuf};
73
74    use crate::test_helpers::{TestRow, TEST_ROW_1};
75
76    #[test]
77    fn simple() -> Result<(), Box<dyn std::error::Error>> {
78        let file_path = PathBuf::new().join("tests/files/JSONCompactEachRow.txt");
79        let content = fs::read_to_string(&file_path)?;
80
81        assert_eq!(
82            GeneralJsonCompactEachRowOutput::format_name(),
83            file_path
84                .file_stem()
85                .unwrap()
86                .to_string_lossy()
87                .parse()
88                .unwrap()
89        );
90
91        let (rows, _info): (_, ()) = GeneralJsonCompactEachRowOutput::new(vec![
92            "array1".into(),
93            "array2".into(),
94            "tuple1".into(),
95            "tuple2".into(),
96            "map1".into(),
97        ])
98        .deserialize(content.as_bytes())?;
99        assert_eq!(
100            rows.first().unwrap().get("tuple1").unwrap(),
101            &Value::Array(vec![1.into(), "a".into()])
102        );
103
104        let (rows, _info): (_, ()) = JsonCompactEachRowOutput::<TestRow>::new(vec![
105            "array1".into(),
106            "array2".into(),
107            "tuple1".into(),
108            "tuple2".into(),
109            "map1".into(),
110        ])
111        .deserialize(content.as_bytes())?;
112        assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
113
114        Ok(())
115    }
116}