clickhouse_client/orm/
query.rs

1//! ORM queries
2//!
3//! ORM queries use the `BinaryRow` format to get data from and into the DB.
4
5use std::marker::PhantomData;
6
7use crate::{
8    error::Error,
9    intf::Interface,
10    query::{Format, Where},
11    Client,
12};
13
14use super::ChRecord;
15
16/// ORM query
17pub struct OrmQuery<'a, T, U>
18where
19    T: Interface,
20    U: ChRecord,
21{
22    /// Client
23    client: &'a Client<T>,
24    /// Record
25    record: PhantomData<U>,
26}
27
28impl<T> Client<T>
29where
30    T: Interface,
31{
32    /// Instantiates a [OrmQuery] instance
33    pub fn orm<U>(&self) -> OrmQuery<T, U>
34    where
35        U: ChRecord,
36    {
37        OrmQuery {
38            client: self,
39            record: PhantomData,
40        }
41    }
42}
43
44impl<'a, T, U> OrmQuery<'a, T, U>
45where
46    T: Interface,
47    U: ChRecord,
48{
49    /// Creates the record table
50    #[tracing::instrument(skip(self), fields(table = U::ch_schema().name))]
51    pub async fn create_table(&self, engine: &str) -> Result<(), Error> {
52        let schema = U::ch_schema();
53        self.client.ddl().create_table(&schema, engine).await
54    }
55
56    /// Drops the record table
57    #[tracing::instrument(skip(self), fields(table = U::ch_schema().name))]
58    pub async fn drop_table(&self) -> Result<(), Error> {
59        let schema = U::ch_schema();
60        self.client.ddl().drop_table(&schema.name).await
61    }
62
63    /// Inserts records
64    #[tracing::instrument(skip(self, records), fields(table = U::ch_schema().name))]
65    pub async fn insert(&self, records: Vec<U>) -> Result<(), Error> {
66        let schema = U::ch_schema();
67        let table = U::to_query_data(records);
68        let _res = self.client.crud().insert(&schema.name, table).await?;
69        Ok(())
70    }
71
72    /// Selects 1 or several records
73    ///
74    /// # Arguments
75    ///
76    /// - if cols is empty, all fields are retrieved
77    #[tracing::instrument(skip(self))]
78    pub async fn select(&self, where_cond: Option<Where>) -> Result<Vec<U>, Error> {
79        let schema = U::ch_schema();
80        let table = self
81            .client
82            .crud()
83            .format(Format::RowBinaryWithNamesAndTypes)
84            .select(&schema.name, vec![], where_cond)
85            .await?
86            .into_table(None)?;
87        U::from_query_data(table)
88    }
89
90    /// Updates a record
91    ///
92    /// # Arguments
93    ///
94    /// - columns: if columns are provided, only those columns are updated
95    #[tracing::instrument(skip(self, record))]
96    pub async fn update_one(&self, record: U, columns: Vec<&str>) -> Result<(), Error> {
97        let schema = U::ch_schema();
98        let record = record.into_ch_record();
99        let primary_fields = record.primary_fields();
100
101        let fields = record
102            .fields
103            .iter()
104            .filter_map(|f| {
105                // primary fields cannot be updated
106                if f.primary {
107                    return None;
108                }
109                if !columns.is_empty() && !columns.contains(&f.id.as_str()) {
110                    return None;
111                }
112                Some((f.id.as_str(), f.value.clone()))
113            })
114            .collect::<Vec<_>>();
115
116        let where_cond = Where::new(
117            format!(
118                "({})",
119                primary_fields
120                    .iter()
121                    .map(|f| f.id.as_str())
122                    .collect::<Vec<_>>()
123                    .join(", ")
124            )
125            .as_str(),
126            "IN",
127            format!(
128                "(({}))",
129                primary_fields
130                    .iter()
131                    .map(|f| { f.value.clone().to_sql_string() })
132                    .collect::<Vec<_>>()
133                    .join(", ")
134            )
135            .as_str(),
136        );
137
138        let _res = self
139            .client
140            .crud()
141            .update(&schema.name, fields, Some(where_cond))
142            .await?;
143        Ok(())
144    }
145
146    /// Delete records
147    #[tracing::instrument(skip(self, records), fields(table = U::ch_schema().name))]
148    pub async fn delete(&self, records: Vec<U>) -> Result<(), Error> {
149        if records.is_empty() {
150            return Ok(());
151        }
152
153        let schema = U::ch_schema();
154        let mut primary_keys = vec![];
155        let mut primary_values = vec![];
156        for (i, record) in records.into_iter().enumerate() {
157            let record = record.into_ch_record();
158            let primary_fields = record.primary_fields();
159            if i == 0 {
160                primary_keys.extend(
161                    primary_fields
162                        .iter()
163                        .map(|f| f.id.to_string())
164                        .collect::<Vec<_>>(),
165                );
166            }
167            primary_values.push(
168                primary_fields
169                    .iter()
170                    .map(|f| f.value.clone())
171                    .collect::<Vec<_>>(),
172            );
173        }
174
175        let where_cond = Where::new(
176            format!("({})", primary_keys.join(", ")).as_str(),
177            "IN",
178            format!(
179                "({})",
180                primary_values
181                    .into_iter()
182                    .map(|values| {
183                        format!(
184                            "({})",
185                            values
186                                .iter()
187                                .map(|v| v.clone().to_sql_string())
188                                .collect::<Vec<_>>()
189                                .join(", ")
190                        )
191                    })
192                    .collect::<Vec<_>>()
193                    .join(", ")
194            )
195            .as_str(),
196        );
197
198        let _res = self
199            .client
200            .crud()
201            .delete(&schema.name, Some(where_cond))
202            .await?;
203        Ok(())
204    }
205}