clickhouse_client/orm/
query.rs1use std::marker::PhantomData;
6
7use crate::{
8 error::Error,
9 intf::Interface,
10 query::{Format, Where},
11 Client,
12};
13
14use super::ChRecord;
15
16pub struct OrmQuery<'a, T, U>
18where
19 T: Interface,
20 U: ChRecord,
21{
22 client: &'a Client<T>,
24 record: PhantomData<U>,
26}
27
28impl<T> Client<T>
29where
30 T: Interface,
31{
32 pub fn orm<U>(&self) -> OrmQuery<T, U>
34 where
35 U: ChRecord,
36 {
37 OrmQuery {
38 client: self,
39 record: PhantomData,
40 }
41 }
42}
43
44impl<'a, T, U> OrmQuery<'a, T, U>
45where
46 T: Interface,
47 U: ChRecord,
48{
49 #[tracing::instrument(skip(self), fields(table = U::ch_schema().name))]
51 pub async fn create_table(&self, engine: &str) -> Result<(), Error> {
52 let schema = U::ch_schema();
53 self.client.ddl().create_table(&schema, engine).await
54 }
55
56 #[tracing::instrument(skip(self), fields(table = U::ch_schema().name))]
58 pub async fn drop_table(&self) -> Result<(), Error> {
59 let schema = U::ch_schema();
60 self.client.ddl().drop_table(&schema.name).await
61 }
62
63 #[tracing::instrument(skip(self, records), fields(table = U::ch_schema().name))]
65 pub async fn insert(&self, records: Vec<U>) -> Result<(), Error> {
66 let schema = U::ch_schema();
67 let table = U::to_query_data(records);
68 let _res = self.client.crud().insert(&schema.name, table).await?;
69 Ok(())
70 }
71
72 #[tracing::instrument(skip(self))]
78 pub async fn select(&self, where_cond: Option<Where>) -> Result<Vec<U>, Error> {
79 let schema = U::ch_schema();
80 let table = self
81 .client
82 .crud()
83 .format(Format::RowBinaryWithNamesAndTypes)
84 .select(&schema.name, vec![], where_cond)
85 .await?
86 .into_table(None)?;
87 U::from_query_data(table)
88 }
89
90 #[tracing::instrument(skip(self, record))]
96 pub async fn update_one(&self, record: U, columns: Vec<&str>) -> Result<(), Error> {
97 let schema = U::ch_schema();
98 let record = record.into_ch_record();
99 let primary_fields = record.primary_fields();
100
101 let fields = record
102 .fields
103 .iter()
104 .filter_map(|f| {
105 if f.primary {
107 return None;
108 }
109 if !columns.is_empty() && !columns.contains(&f.id.as_str()) {
110 return None;
111 }
112 Some((f.id.as_str(), f.value.clone()))
113 })
114 .collect::<Vec<_>>();
115
116 let where_cond = Where::new(
117 format!(
118 "({})",
119 primary_fields
120 .iter()
121 .map(|f| f.id.as_str())
122 .collect::<Vec<_>>()
123 .join(", ")
124 )
125 .as_str(),
126 "IN",
127 format!(
128 "(({}))",
129 primary_fields
130 .iter()
131 .map(|f| { f.value.clone().to_sql_string() })
132 .collect::<Vec<_>>()
133 .join(", ")
134 )
135 .as_str(),
136 );
137
138 let _res = self
139 .client
140 .crud()
141 .update(&schema.name, fields, Some(where_cond))
142 .await?;
143 Ok(())
144 }
145
146 #[tracing::instrument(skip(self, records), fields(table = U::ch_schema().name))]
148 pub async fn delete(&self, records: Vec<U>) -> Result<(), Error> {
149 if records.is_empty() {
150 return Ok(());
151 }
152
153 let schema = U::ch_schema();
154 let mut primary_keys = vec![];
155 let mut primary_values = vec![];
156 for (i, record) in records.into_iter().enumerate() {
157 let record = record.into_ch_record();
158 let primary_fields = record.primary_fields();
159 if i == 0 {
160 primary_keys.extend(
161 primary_fields
162 .iter()
163 .map(|f| f.id.to_string())
164 .collect::<Vec<_>>(),
165 );
166 }
167 primary_values.push(
168 primary_fields
169 .iter()
170 .map(|f| f.value.clone())
171 .collect::<Vec<_>>(),
172 );
173 }
174
175 let where_cond = Where::new(
176 format!("({})", primary_keys.join(", ")).as_str(),
177 "IN",
178 format!(
179 "({})",
180 primary_values
181 .into_iter()
182 .map(|values| {
183 format!(
184 "({})",
185 values
186 .iter()
187 .map(|v| v.clone().to_sql_string())
188 .collect::<Vec<_>>()
189 .join(", ")
190 )
191 })
192 .collect::<Vec<_>>()
193 .join(", ")
194 )
195 .as_str(),
196 );
197
198 let _res = self
199 .client
200 .crud()
201 .delete(&schema.name, Some(where_cond))
202 .await?;
203 Ok(())
204 }
205}