clickhouse_client/query/crud/
mod.rs

1//! Query CRUD executor
2
3#[cfg(test)]
4mod tests;
5
6use crate::{error::Error, intf::Interface, value::Value, Client};
7
8use super::{Format, Query, QueryData, QueryResponse, Where};
9
10/// CRUD query
11#[derive(Debug)]
12pub struct CRUDQuery<'a, T>
13where
14    T: Interface,
15{
16    /// Client
17    client: &'a Client<T>,
18    /// Query
19    query: Query,
20}
21
22impl<T> Client<T>
23where
24    T: Interface,
25{
26    /// Prepares a [CRUDQuery] from a client
27    pub fn crud(&self) -> CRUDQuery<T> {
28        CRUDQuery {
29            client: self,
30            query: Query {
31                db: self.db.clone(),
32                credentials: self.credentials.clone(),
33                ..Default::default()
34            },
35        }
36    }
37}
38
39impl<'a, T> CRUDQuery<'a, T>
40where
41    T: Interface,
42{
43    /// Sets the query format
44    pub fn format(mut self, format: Format) -> Self {
45        self.query.format = Some(format);
46        self
47    }
48
49    /// Insert rows(s)
50    #[tracing::instrument(skip(self, data))]
51    pub async fn insert(self, table: &str, data: QueryData) -> Result<QueryResponse, Error> {
52        // NB: To pass the data inside the HTTP body, a FORMAT clause must be passed to the
53        // SQL statement explicitly.
54        let format = self.query.format.unwrap_or(Format::RowBinary);
55
56        let query = self
57            .query
58            .statement("INSERT INTO [??] FORMAT [??]")
59            .bind_str(table)
60            .bind_str(&format.to_string())
61            .format(format)
62            .data(data);
63        self.client.send(query).await
64    }
65
66    /// Select rows
67    ///
68    /// If the columns is empty, all columns are returned
69    #[tracing::instrument(skip(self))]
70    pub async fn select(
71        self,
72        table: &str,
73        fields: Vec<&str>,
74        where_cond: Option<Where>,
75    ) -> Result<QueryResponse, Error> {
76        let fields = if fields.is_empty() {
77            "*".to_string()
78        } else {
79            fields
80                .iter()
81                .map(|c| c.to_string())
82                .collect::<Vec<_>>()
83                .join(", ")
84        };
85
86        let query = self
87            .query
88            .statement("SELECT [??] FROM [??][??]")
89            .bind_str(&fields)
90            .bind_str(table)
91            .bind_str(where_cond.unwrap_or_default().to_string().as_str());
92        self.client.send(query).await
93    }
94
95    /// Update rows
96    #[tracing::instrument(skip(self))]
97    pub async fn update(
98        self,
99        table: &str,
100        fields: Vec<(&str, Value)>,
101        where_cond: Option<Where>,
102    ) -> Result<QueryResponse, Error> {
103        let fields = fields
104            .iter()
105            .map(|(k, v)| format!("{} = {}", k, v.clone().to_sql_string()))
106            .collect::<Vec<_>>()
107            .join(", ");
108
109        let query = self
110            .query
111            .statement("ALTER TABLE [??] UPDATE [??][??]")
112            .bind_str(table)
113            .bind_str(&fields)
114            .bind_str(&where_cond.unwrap_or_default().to_string());
115        self.client.send(query).await
116    }
117
118    /// Delete rows
119    ///
120    /// If the columns is empty, all columns are returned
121    #[tracing::instrument(skip(self))]
122    pub async fn delete(
123        self,
124        table: &str,
125        where_cond: Option<Where>,
126    ) -> Result<QueryResponse, Error> {
127        let query = self
128            .query
129            .statement("DELETE FROM [??][??]")
130            .bind_str(table)
131            .bind_str(&where_cond.unwrap_or_default().to_string());
132        self.client.send(query).await
133    }
134}