Skip to main content

clickhouse/
insert.rs

1use crate::insert_formatted::{BufInsertFormatted, InsertFormatted};
2use crate::row_metadata::RowMetadata;
3use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
4use crate::{
5    Client, RowWrite,
6    error::Result,
7    formats,
8    row::{self, Row},
9};
10use clickhouse_types::put_rbwnat_columns_header;
11use serde::Serialize;
12use std::num::Saturating;
13use std::{future::Future, marker::PhantomData, time::Duration};
14
15// The desired max frame size.
16const BUFFER_SIZE: usize = 256 * 1024;
17// Threshold to send a chunk. Should be slightly less than `BUFFER_SIZE`
18// to avoid extra reallocations in case of a big last row.
19const MIN_CHUNK_SIZE: usize = const {
20    // to use the whole buffer's capacity
21    assert!(BUFFER_SIZE.is_power_of_two());
22    BUFFER_SIZE - 2048
23};
24
25/// Performs one `INSERT`.
26///
27/// The [`Insert::end`] must be called to finalize the `INSERT`.
28/// Otherwise, the whole `INSERT` will be aborted.
29///
30/// Rows are sent progressively to spread network load.
31///
32/// # Note: Metadata is Cached
33/// If [validation is enabled][Client::with_validation],
34/// this helper will query the metadata for the target table to learn the column names and types.
35///
36/// To avoid querying this metadata every time, it is cached within the [`Client`].
37///
38/// Any concurrent changes to the table schema may cause insert failures if the metadata
39/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
40/// after any changes to the current database schema.
41#[must_use]
42pub struct Insert<T> {
43    insert: BufInsertFormatted,
44    row_metadata: Option<RowMetadata>,
45    sent_rows: Saturating<u64>,
46    _marker: PhantomData<fn() -> T>, // TODO: test contravariance.
47}
48
49impl<T> Insert<T> {
50    pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
51    where
52        T: Row,
53    {
54        let fields = row::join_column_names::<T>()
55            .expect("the row type must be a struct or a wrapper around it");
56
57        let format = if row_metadata.is_some() {
58            formats::ROW_BINARY_WITH_NAMES_AND_TYPES
59        } else {
60            formats::ROW_BINARY
61        };
62        let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
63
64        Self {
65            insert: InsertFormatted::new(client, sql, Some(table))
66                .buffered_with_capacity(BUFFER_SIZE),
67            row_metadata,
68            sent_rows: Saturating(0),
69            _marker: PhantomData,
70        }
71    }
72
73    /// Converts this typed insert into a type-erased [`Insert<()>`], preserving any
74    /// row metadata (and thus schema validation) that was fetched for `T`.
75    ///
76    /// This lets you to store `Insert<()>` in a container, for example
77    /// `HashMap<std::any::TypeId, Insert<()>>`.
78    ///
79    /// ```ignore
80    /// let mut insert = client.insert::<MyRow>("table").await?.into_any();
81    /// insert.write_any(&MyRow { ... }).await?;
82    /// insert.end().await?;
83    /// ```
84    pub fn into_any(self) -> Insert<()> {
85        Insert {
86            insert: self.insert,
87            row_metadata: self.row_metadata,
88            sent_rows: self.sent_rows,
89            _marker: PhantomData,
90        }
91    }
92
93    /// Sets timeouts for different operations.
94    ///
95    /// `send_timeout` restricts time on sending a data chunk to a socket.
96    /// `None` disables the timeout, it's a default.
97    /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
98    ///
99    /// `end_timeout` restricts time on waiting for a response from the CH
100    /// server. Thus, it includes all work needed to handle `INSERT` by the
101    /// CH server, e.g. handling all materialized views and so on.
102    /// `None` disables the timeout, it's a default.
103    /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
104    ///
105    /// These timeouts are much more performant (~x10) than wrapping `write()`
106    /// and `end()` calls into `tokio::time::timeout()`.
107    pub fn with_timeouts(
108        mut self,
109        send_timeout: Option<Duration>,
110        end_timeout: Option<Duration>,
111    ) -> Self {
112        self.set_timeouts(send_timeout, end_timeout);
113        self
114    }
115
116    /// Configure the [roles] to use when executing `INSERT` statements.
117    ///
118    /// Overrides any roles previously set by this method, [`Insert::with_setting`],
119    /// [`Client::with_roles`] or [`Client::with_setting`].
120    ///
121    /// An empty iterator may be passed to clear the set roles.
122    ///
123    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
124    ///
125    /// # Panics
126    /// If called after the request is started, e.g., after [`Insert::write`].
127    pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
128        self.insert.expect_client_mut().set_roles(roles);
129        self
130    }
131
132    /// Clear any explicit [roles] previously set on this `Insert` or inherited from [`Client`].
133    ///
134    /// Overrides any roles previously set by [`Insert::with_roles`], [`Insert::with_setting`],
135    /// [`Client::with_roles`] or [`Client::with_setting`].
136    ///
137    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
138    ///
139    /// # Panics
140    /// If called after the request is started, e.g., after [`Insert::write`].
141    pub fn with_default_roles(mut self) -> Self {
142        self.insert.expect_client_mut().clear_roles();
143        self
144    }
145
146    /// Similar to [`Client::with_option`], but for this particular INSERT
147    /// statement only.
148    ///
149    /// # Panics
150    /// If called after the request is started, e.g., after [`Insert::write`].
151    #[track_caller]
152    #[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
153    pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
154        self.insert.expect_client_mut().set_setting(name, value);
155        self
156    }
157
158    /// Similar to [`Client::with_setting`], but for this particular INSERT
159    /// statement only.
160    ///
161    /// # Panics
162    /// If called after the request is started, e.g., after [`Insert::write`].
163    #[track_caller]
164    pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
165        self.insert.expect_client_mut().set_setting(name, value);
166        self
167    }
168
169    pub(crate) fn set_timeouts(
170        &mut self,
171        send_timeout: Option<Duration>,
172        end_timeout: Option<Duration>,
173    ) {
174        self.insert.set_timeouts(send_timeout, end_timeout);
175    }
176
177    /// Serializes the provided row into an internal buffer.
178    /// Once the buffer is full, it's sent to a background task writing to the
179    /// socket.
180    ///
181    /// Close to:
182    ///
183    /// ```ignore
184    /// async fn write<T>(&self, row: &T) -> Result<usize>;
185    /// ```
186    ///
187    /// A returned future doesn't depend on the row's lifetime.
188    ///
189    /// Returns an error if the row cannot be serialized or the background task
190    /// failed. Once failed, the whole `INSERT` is aborted and cannot be
191    /// used anymore.
192    ///
193    /// # Panics
194    ///
195    /// If called after the previous call that returned an error.
196    pub fn write<'a>(
197        &'a mut self,
198        row: &T::Value<'_>,
199    ) -> impl Future<Output = Result<()>> + 'a + Send
200    where
201        T: RowWrite,
202    {
203        let result = self.do_write(row);
204
205        async move {
206            result?;
207            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
208                self.insert.flush().await?;
209            }
210
211            self.sent_rows += 1;
212
213            Ok(())
214        }
215    }
216
217    /// Writes a row of any type `R` into this insert.
218    ///
219    /// Unlike [`Insert::write`], the row type `R` is inferred directly from the
220    /// argument, no turbofish needed:
221    /// ```ignore
222    /// insert.write_any(&MyRow { ... }).await?;
223    /// ```
224    ///
225    /// This is the primary write method for type-erased [`Insert<()>`] obtained
226    /// from [`Client::insert_any`].
227    pub fn write_any<'a, R>(&'a mut self, row: &R) -> impl Future<Output = Result<()>> + 'a + Send
228    where
229        R: Row + Serialize,
230    {
231        let result = self.do_write_any(row);
232
233        async move {
234            result?;
235            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
236                self.insert.flush().await?;
237            }
238            Ok(())
239        }
240    }
241
242    /// Returns the number of bytes written, not including the RBWNAT header.
243    #[inline(always)]
244    pub(crate) fn do_write(&mut self, row: &T::Value<'_>) -> Result<usize>
245    where
246        T: RowWrite,
247    {
248        self.do_write_any(row)
249    }
250
251    #[inline(always)]
252    pub(crate) fn do_write_any<R>(&mut self, row: &R) -> Result<usize>
253    where
254        R: Row + Serialize,
255    {
256        // We don't want to wait for the buffer to be full before we start the request,
257        // in the event of an error.
258        self.init_request_if_required()?;
259
260        // The following calls need an `impl BufMut`
261        let buffer = self.insert.buffer_mut();
262
263        let old_buf_size = buffer.len();
264        let result = match &self.row_metadata {
265            Some(metadata) => serialize_with_validation(&mut *buffer, row, metadata),
266            None => serialize_row_binary(&mut *buffer, row),
267        };
268        let written = buffer.len() - old_buf_size;
269
270        if let Err(e) = &result {
271            e.record_in_current_span("error serializing row");
272            self.abort();
273        }
274
275        result.and(Ok(written))
276    }
277
278    /// Ends `INSERT`, the server starts processing the data.
279    ///
280    /// Succeeds if the server returns 200, that means the `INSERT` was handled
281    /// successfully, including all materialized views and quorum writes.
282    ///
283    /// NOTE: If it isn't called, the whole `INSERT` is aborted.
284    pub async fn end(mut self) -> Result<()> {
285        // `InsertFormatted::end()` will add `sent_bytes` and `encoded_bytes` to the span.
286        tracing::record_all!(
287            self.insert.span(),
288            clickhouse.request.sent_rows = self.sent_rows.0,
289        );
290
291        self.insert.end().await
292    }
293
294    fn init_request_if_required(&mut self) -> Result<()> {
295        let fresh_request = self.insert.init_request_if_required()?;
296
297        if fresh_request && let Some(metadata) = &self.row_metadata {
298            put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
299                .inspect_err(|_| self.abort())?;
300        }
301
302        Ok(())
303    }
304
305    fn abort(&mut self) {
306        self.insert.abort();
307    }
308}