clickhouse_format/output/
json_each_row.rs

1use core::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    io::{BufRead as _, Error as IoError},
5};
6
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::format_name::FormatName;
11
12use super::{Output, OutputResult};
13
14pub struct JsonEachRowOutput<T> {
15    phantom: PhantomData<T>,
16}
17impl<T> Default for JsonEachRowOutput<T> {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22impl<T> JsonEachRowOutput<T> {
23    pub fn new() -> Self {
24        Self {
25            phantom: PhantomData,
26        }
27    }
28}
29
30pub type GeneralJsonEachRowOutput = JsonEachRowOutput<HashMap<String, Value>>;
31
32#[derive(thiserror::Error, Debug)]
33pub enum JsonEachRowOutputError {
34    #[error("IoError {0:?}")]
35    IoError(#[from] IoError),
36    #[error("SerdeJsonError {0:?}")]
37    SerdeJsonError(#[from] serde_json::Error),
38}
39
40impl<T> Output for JsonEachRowOutput<T>
41where
42    T: DeserializeOwned,
43{
44    type Row = T;
45    type Info = ();
46
47    type Error = JsonEachRowOutputError;
48
49    fn format_name() -> FormatName {
50        FormatName::JsonEachRow
51    }
52
53    fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
54        let mut data: Vec<T> = vec![];
55        for line in slice.lines() {
56            let line = line?;
57            let row: T = serde_json::from_str(&line)?;
58            data.push(row);
59        }
60
61        Ok((data, ()))
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68
69    use std::{fs, path::PathBuf};
70
71    use crate::test_helpers::{TestRow, TEST_ROW_1};
72
73    #[test]
74    fn simple() -> Result<(), Box<dyn std::error::Error>> {
75        let file_path = PathBuf::new().join("tests/files/JSONEachRow.txt");
76        let content = fs::read_to_string(&file_path)?;
77
78        assert_eq!(
79            GeneralJsonEachRowOutput::format_name(),
80            file_path
81                .file_stem()
82                .unwrap()
83                .to_string_lossy()
84                .parse()
85                .unwrap()
86        );
87
88        let (rows, _info): (_, ()) =
89            GeneralJsonEachRowOutput::new().deserialize(content.as_bytes())?;
90        assert_eq!(
91            rows.first().unwrap().get("tuple1").unwrap(),
92            &Value::Array(vec![1.into(), "a".into()])
93        );
94
95        let (rows, _info): (_, ()) =
96            JsonEachRowOutput::<TestRow>::new().deserialize(content.as_bytes())?;
97        assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
98
99        Ok(())
100    }
101}