clickhouse_format/output/
json_each_row.rs1use core::marker::PhantomData;
2use std::{
3 collections::HashMap,
4 io::{BufRead as _, Error as IoError},
5};
6
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::format_name::FormatName;
11
12use super::{Output, OutputResult};
13
14pub struct JsonEachRowOutput<T> {
15 phantom: PhantomData<T>,
16}
17impl<T> Default for JsonEachRowOutput<T> {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22impl<T> JsonEachRowOutput<T> {
23 pub fn new() -> Self {
24 Self {
25 phantom: PhantomData,
26 }
27 }
28}
29
30pub type GeneralJsonEachRowOutput = JsonEachRowOutput<HashMap<String, Value>>;
31
32#[derive(thiserror::Error, Debug)]
33pub enum JsonEachRowOutputError {
34 #[error("IoError {0:?}")]
35 IoError(#[from] IoError),
36 #[error("SerdeJsonError {0:?}")]
37 SerdeJsonError(#[from] serde_json::Error),
38}
39
40impl<T> Output for JsonEachRowOutput<T>
41where
42 T: DeserializeOwned,
43{
44 type Row = T;
45 type Info = ();
46
47 type Error = JsonEachRowOutputError;
48
49 fn format_name() -> FormatName {
50 FormatName::JsonEachRow
51 }
52
53 fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
54 let mut data: Vec<T> = vec![];
55 for line in slice.lines() {
56 let line = line?;
57 let row: T = serde_json::from_str(&line)?;
58 data.push(row);
59 }
60
61 Ok((data, ()))
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68
69 use std::{fs, path::PathBuf};
70
71 use crate::test_helpers::{TestRow, TEST_ROW_1};
72
73 #[test]
74 fn simple() -> Result<(), Box<dyn std::error::Error>> {
75 let file_path = PathBuf::new().join("tests/files/JSONEachRow.txt");
76 let content = fs::read_to_string(&file_path)?;
77
78 assert_eq!(
79 GeneralJsonEachRowOutput::format_name(),
80 file_path
81 .file_stem()
82 .unwrap()
83 .to_string_lossy()
84 .parse()
85 .unwrap()
86 );
87
88 let (rows, _info): (_, ()) =
89 GeneralJsonEachRowOutput::new().deserialize(content.as_bytes())?;
90 assert_eq!(
91 rows.first().unwrap().get("tuple1").unwrap(),
92 &Value::Array(vec![1.into(), "a".into()])
93 );
94
95 let (rows, _info): (_, ()) =
96 JsonEachRowOutput::<TestRow>::new().deserialize(content.as_bytes())?;
97 assert_eq!(rows.first().unwrap(), &*TEST_ROW_1);
98
99 Ok(())
100 }
101}