clickhouse_format/output/
tsv_raw.rs1use core::marker::PhantomData;
2use std::collections::HashMap;
3
4use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
5use serde::de::DeserializeOwned;
6
7use crate::format_name::FormatName;
8
9use super::{Output, OutputResult};
10
11pub struct TsvRawOutput<T> {
12 names: Option<Vec<String>>,
13 types: Option<Vec<String>>,
14 phantom: PhantomData<T>,
15}
16impl<T> Default for TsvRawOutput<T> {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21impl<T> TsvRawOutput<T> {
22 pub fn new() -> Self {
23 Self {
24 names: None,
25 types: None,
26 phantom: PhantomData,
27 }
28 }
29 pub fn with_names(names: Vec<String>) -> Self {
30 Self {
31 names: Some(names),
32 types: None,
33 phantom: PhantomData,
34 }
35 }
36 pub fn with_names_and_types(names: Vec<String>, types: Vec<String>) -> Self {
37 Self {
38 names: Some(names),
39 types: Some(types),
40 phantom: PhantomData,
41 }
42 }
43 pub(crate) fn from_raw_parts(names: Option<Vec<String>>, types: Option<Vec<String>>) -> Self {
44 Self {
45 names,
46 types,
47 phantom: PhantomData,
48 }
49 }
50}
51
52impl<T> Output for TsvRawOutput<T>
53where
54 T: DeserializeOwned,
55{
56 type Row = T;
57 type Info = Option<HashMap<String, String>>;
58
59 type Error = csv::Error;
60
61 fn format_name() -> FormatName {
62 FormatName::TsvRaw
63 }
64
65 fn deserialize(&self, slice: &[u8]) -> OutputResult<Self::Row, Self::Info, Self::Error> {
66 let rdr = ReaderBuilder::new()
67 .delimiter(b'\t')
68 .has_headers(false)
69 .from_reader(slice);
70
71 self.deserialize_with_records(rdr.into_records())
72 }
73}
74impl<T> TsvRawOutput<T>
75where
76 T: DeserializeOwned,
77{
78 pub(crate) fn deserialize_with_records(
79 &self,
80 records: StringRecordsIntoIter<&[u8]>,
81 ) -> OutputResult<<Self as Output>::Row, <Self as Output>::Info, <Self as Output>::Error> {
82 let header = &self.names.to_owned().map(StringRecord::from);
83 let mut data: Vec<T> = vec![];
84 for record in records {
85 let record = record?;
86 let row: T = record.deserialize(header.as_ref())?;
87 data.push(row);
88 }
89
90 let info = if let Some(types) = &self.types {
91 self.names
92 .to_owned()
93 .map(|x| x.into_iter().zip(types.to_owned()).collect())
94 } else {
95 None
96 };
97
98 Ok((data, info))
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 use std::{fs, path::PathBuf};
107
108 use crate::test_helpers::{TestStringsRow, TEST_STRINGS_ROW_1};
109
110 #[test]
111 fn simple() -> Result<(), Box<dyn std::error::Error>> {
112 let file_path = PathBuf::new().join("tests/files/TSVRaw.tsv");
113 let content = fs::read_to_string(&file_path)?;
114
115 assert_eq!(
116 TsvRawOutput::<HashMap<String, String>>::format_name(),
117 file_path
118 .file_stem()
119 .unwrap()
120 .to_string_lossy()
121 .parse()
122 .unwrap()
123 );
124
125 let (rows, info) = TsvRawOutput::<HashMap<String, String>>::with_names(vec![
126 "array1".into(),
127 "array2".into(),
128 "tuple1".into(),
129 "tuple2".into(),
130 "map1".into(),
131 ])
132 .deserialize(content.as_bytes())?;
133 assert_eq!(rows.first().unwrap().get("tuple1").unwrap(), "(1,'a')");
134 assert_eq!(info, None);
135
136 let (rows, info) = TsvRawOutput::<TestStringsRow>::new().deserialize(content.as_bytes())?;
137 assert_eq!(rows.first().unwrap(), &*TEST_STRINGS_ROW_1);
138 assert_eq!(info, None);
139
140 Ok(())
141 }
142}