Skip to main content

clickhouse/
insert.rs

1use crate::insert_formatted::BufInsertFormatted;
2use crate::row_metadata::RowMetadata;
3use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
4use crate::{
5    Client, RowWrite,
6    error::Result,
7    formats,
8    row::{self, Row},
9};
10use clickhouse_types::put_rbwnat_columns_header;
11use serde::Serialize;
12use std::{future::Future, marker::PhantomData, time::Duration};
13
14// The desired max frame size.
15const BUFFER_SIZE: usize = 256 * 1024;
16// Threshold to send a chunk. Should be slightly less than `BUFFER_SIZE`
17// to avoid extra reallocations in case of a big last row.
18const MIN_CHUNK_SIZE: usize = const {
19    // to use the whole buffer's capacity
20    assert!(BUFFER_SIZE.is_power_of_two());
21    BUFFER_SIZE - 2048
22};
23
24/// Performs one `INSERT`.
25///
26/// The [`Insert::end`] must be called to finalize the `INSERT`.
27/// Otherwise, the whole `INSERT` will be aborted.
28///
29/// Rows are sent progressively to spread network load.
30///
31/// # Note: Metadata is Cached
32/// If [validation is enabled][Client::with_validation],
33/// this helper will query the metadata for the target table to learn the column names and types.
34///
35/// To avoid querying this metadata every time, it is cached within the [`Client`].
36///
37/// Any concurrent changes to the table schema may cause insert failures if the metadata
38/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
39/// after any changes to the current database schema.
40#[must_use]
41pub struct Insert<T> {
42    insert: BufInsertFormatted,
43    row_metadata: Option<RowMetadata>,
44    _marker: PhantomData<fn() -> T>, // TODO: test contravariance.
45}
46
47impl<T> Insert<T> {
48    pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
49    where
50        T: Row,
51    {
52        let fields = row::join_column_names::<T>()
53            .expect("the row type must be a struct or a wrapper around it");
54
55        // TODO: what about escaping a table name?
56        // https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
57        let format = if row_metadata.is_some() {
58            formats::ROW_BINARY_WITH_NAMES_AND_TYPES
59        } else {
60            formats::ROW_BINARY
61        };
62        let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
63
64        Self {
65            insert: client
66                .insert_formatted_with(sql)
67                .buffered_with_capacity(BUFFER_SIZE),
68            row_metadata,
69            _marker: PhantomData,
70        }
71    }
72
73    /// Converts this typed insert into a type-erased [`Insert<()>`], preserving any
74    /// row metadata (and thus schema validation) that was fetched for `T`.
75    ///
76    /// This lets you to store `Insert<()>` in a container, for example
77    /// `HashMap<std::any::TypeId, Insert<()>>`.
78    ///
79    /// ```ignore
80    /// let mut insert = client.insert::<MyRow>("table").await?.into_any();
81    /// insert.write_any(&MyRow { ... }).await?;
82    /// insert.end().await?;
83    /// ```
84    pub fn into_any(self) -> Insert<()> {
85        Insert {
86            insert: self.insert,
87            row_metadata: self.row_metadata,
88            _marker: PhantomData,
89        }
90    }
91
92    /// Sets timeouts for different operations.
93    ///
94    /// `send_timeout` restricts time on sending a data chunk to a socket.
95    /// `None` disables the timeout, it's a default.
96    /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
97    ///
98    /// `end_timeout` restricts time on waiting for a response from the CH
99    /// server. Thus, it includes all work needed to handle `INSERT` by the
100    /// CH server, e.g. handling all materialized views and so on.
101    /// `None` disables the timeout, it's a default.
102    /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
103    ///
104    /// These timeouts are much more performant (~x10) than wrapping `write()`
105    /// and `end()` calls into `tokio::time::timeout()`.
106    pub fn with_timeouts(
107        mut self,
108        send_timeout: Option<Duration>,
109        end_timeout: Option<Duration>,
110    ) -> Self {
111        self.set_timeouts(send_timeout, end_timeout);
112        self
113    }
114
115    /// Configure the [roles] to use when executing `INSERT` statements.
116    ///
117    /// Overrides any roles previously set by this method, [`Insert::with_option`],
118    /// [`Client::with_roles`] or [`Client::with_option`].
119    ///
120    /// An empty iterator may be passed to clear the set roles.
121    ///
122    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
123    ///
124    /// # Panics
125    /// If called after the request is started, e.g., after [`Insert::write`].
126    pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
127        self.insert.expect_client_mut().set_roles(roles);
128        self
129    }
130
131    /// Clear any explicit [roles] previously set on this `Insert` or inherited from [`Client`].
132    ///
133    /// Overrides any roles previously set by [`Insert::with_roles`], [`Insert::with_option`],
134    /// [`Client::with_roles`] or [`Client::with_option`].
135    ///
136    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
137    ///
138    /// # Panics
139    /// If called after the request is started, e.g., after [`Insert::write`].
140    pub fn with_default_roles(mut self) -> Self {
141        self.insert.expect_client_mut().clear_roles();
142        self
143    }
144
145    /// Similar to [`Client::with_option`], but for this particular INSERT
146    /// statement only.
147    ///
148    /// # Panics
149    /// If called after the request is started, e.g., after [`Insert::write`].
150    #[track_caller]
151    pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
152        self.insert.expect_client_mut().set_option(name, value);
153        self
154    }
155
156    pub(crate) fn set_timeouts(
157        &mut self,
158        send_timeout: Option<Duration>,
159        end_timeout: Option<Duration>,
160    ) {
161        self.insert.set_timeouts(send_timeout, end_timeout);
162    }
163
164    /// Serializes the provided row into an internal buffer.
165    /// Once the buffer is full, it's sent to a background task writing to the
166    /// socket.
167    ///
168    /// Close to:
169    ///
170    /// ```ignore
171    /// async fn write<T>(&self, row: &T) -> Result<usize>;
172    /// ```
173    ///
174    /// A returned future doesn't depend on the row's lifetime.
175    ///
176    /// Returns an error if the row cannot be serialized or the background task
177    /// failed. Once failed, the whole `INSERT` is aborted and cannot be
178    /// used anymore.
179    ///
180    /// # Panics
181    ///
182    /// If called after the previous call that returned an error.
183    pub fn write<'a>(
184        &'a mut self,
185        row: &T::Value<'_>,
186    ) -> impl Future<Output = Result<()>> + 'a + Send
187    where
188        T: RowWrite,
189    {
190        let result = self.do_write(row);
191
192        async move {
193            result?;
194            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
195                self.insert.flush().await?;
196            }
197            Ok(())
198        }
199    }
200
201    /// Writes a row of any type `R` into this insert.
202    ///
203    /// Unlike [`Insert::write`], the row type `R` is inferred directly from the
204    /// argument, no turbofish needed:
205    /// ```ignore
206    /// insert.write_any(&MyRow { ... }).await?;
207    /// ```
208    ///
209    /// This is the primary write method for type-erased [`Insert<()>`] obtained
210    /// from [`Client::insert_any`].
211    pub fn write_any<'a, R>(&'a mut self, row: &R) -> impl Future<Output = Result<()>> + 'a + Send
212    where
213        R: Row + Serialize,
214    {
215        let result = self.do_write_any(row);
216
217        async move {
218            result?;
219            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
220                self.insert.flush().await?;
221            }
222            Ok(())
223        }
224    }
225
226    /// Returns the number of bytes written, not including the RBWNAT header.
227    #[inline(always)]
228    pub(crate) fn do_write(&mut self, row: &T::Value<'_>) -> Result<usize>
229    where
230        T: RowWrite,
231    {
232        self.do_write_any(row)
233    }
234
235    #[inline(always)]
236    pub(crate) fn do_write_any<R>(&mut self, row: &R) -> Result<usize>
237    where
238        R: Row + Serialize,
239    {
240        // We don't want to wait for the buffer to be full before we start the request,
241        // in the event of an error.
242        self.init_request_if_required()?;
243
244        // The following calls need an `impl BufMut`
245        let buffer = self.insert.buffer_mut();
246
247        let old_buf_size = buffer.len();
248        let result = match &self.row_metadata {
249            Some(metadata) => serialize_with_validation(&mut *buffer, row, metadata),
250            None => serialize_row_binary(&mut *buffer, row),
251        };
252        let written = buffer.len() - old_buf_size;
253
254        if result.is_err() {
255            self.abort();
256        }
257
258        result.and(Ok(written))
259    }
260
261    /// Ends `INSERT`, the server starts processing the data.
262    ///
263    /// Succeeds if the server returns 200, that means the `INSERT` was handled
264    /// successfully, including all materialized views and quorum writes.
265    ///
266    /// NOTE: If it isn't called, the whole `INSERT` is aborted.
267    pub async fn end(mut self) -> Result<()> {
268        self.insert.end().await
269    }
270
271    fn init_request_if_required(&mut self) -> Result<()> {
272        let fresh_request = self.insert.init_request_if_required()?;
273
274        if fresh_request && let Some(metadata) = &self.row_metadata {
275            put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
276                .inspect_err(|_| self.abort())?;
277        }
278
279        Ok(())
280    }
281
282    fn abort(&mut self) {
283        self.insert.abort();
284    }
285}