clickhouse/insert.rs
1use crate::insert_formatted::{BufInsertFormatted, InsertFormatted};
2use crate::row_metadata::RowMetadata;
3use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
4use crate::{
5 Client, RowWrite,
6 error::Result,
7 formats,
8 row::{self, Row},
9};
10use clickhouse_types::put_rbwnat_columns_header;
11use serde::Serialize;
12use std::num::Saturating;
13use std::{future::Future, marker::PhantomData, time::Duration};
14
15// The desired max frame size.
16const BUFFER_SIZE: usize = 256 * 1024;
17// Threshold to send a chunk. Should be slightly less than `BUFFER_SIZE`
18// to avoid extra reallocations in case of a big last row.
19const MIN_CHUNK_SIZE: usize = const {
20 // to use the whole buffer's capacity
21 assert!(BUFFER_SIZE.is_power_of_two());
22 BUFFER_SIZE - 2048
23};
24
25/// Performs one `INSERT`.
26///
27/// The [`Insert::end`] must be called to finalize the `INSERT`.
28/// Otherwise, the whole `INSERT` will be aborted.
29///
30/// Rows are sent progressively to spread network load.
31///
32/// # Note: Metadata is Cached
33/// If [validation is enabled][Client::with_validation],
34/// this helper will query the metadata for the target table to learn the column names and types.
35///
36/// To avoid querying this metadata every time, it is cached within the [`Client`].
37///
38/// Any concurrent changes to the table schema may cause insert failures if the metadata
39/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
40/// after any changes to the current database schema.
41#[must_use]
42pub struct Insert<T> {
43 insert: BufInsertFormatted,
44 row_metadata: Option<RowMetadata>,
45 sent_rows: Saturating<u64>,
46 _marker: PhantomData<fn() -> T>, // TODO: test contravariance.
47}
48
49impl<T> Insert<T> {
50 pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
51 where
52 T: Row,
53 {
54 let fields = row::join_column_names::<T>()
55 .expect("the row type must be a struct or a wrapper around it");
56
57 let format = if row_metadata.is_some() {
58 formats::ROW_BINARY_WITH_NAMES_AND_TYPES
59 } else {
60 formats::ROW_BINARY
61 };
62 let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
63
64 Self {
65 insert: InsertFormatted::new(client, sql, Some(table))
66 .buffered_with_capacity(BUFFER_SIZE),
67 row_metadata,
68 sent_rows: Saturating(0),
69 _marker: PhantomData,
70 }
71 }
72
73 /// Converts this typed insert into a type-erased [`Insert<()>`], preserving any
74 /// row metadata (and thus schema validation) that was fetched for `T`.
75 ///
76 /// This lets you to store `Insert<()>` in a container, for example
77 /// `HashMap<std::any::TypeId, Insert<()>>`.
78 ///
79 /// ```ignore
80 /// let mut insert = client.insert::<MyRow>("table").await?.into_any();
81 /// insert.write_any(&MyRow { ... }).await?;
82 /// insert.end().await?;
83 /// ```
84 pub fn into_any(self) -> Insert<()> {
85 Insert {
86 insert: self.insert,
87 row_metadata: self.row_metadata,
88 sent_rows: self.sent_rows,
89 _marker: PhantomData,
90 }
91 }
92
93 /// Sets timeouts for different operations.
94 ///
95 /// `send_timeout` restricts time on sending a data chunk to a socket.
96 /// `None` disables the timeout, it's a default.
97 /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
98 ///
99 /// `end_timeout` restricts time on waiting for a response from the CH
100 /// server. Thus, it includes all work needed to handle `INSERT` by the
101 /// CH server, e.g. handling all materialized views and so on.
102 /// `None` disables the timeout, it's a default.
103 /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
104 ///
105 /// These timeouts are much more performant (~x10) than wrapping `write()`
106 /// and `end()` calls into `tokio::time::timeout()`.
107 pub fn with_timeouts(
108 mut self,
109 send_timeout: Option<Duration>,
110 end_timeout: Option<Duration>,
111 ) -> Self {
112 self.set_timeouts(send_timeout, end_timeout);
113 self
114 }
115
116 /// Configure the [roles] to use when executing `INSERT` statements.
117 ///
118 /// Overrides any roles previously set by this method, [`Insert::with_setting`],
119 /// [`Client::with_roles`] or [`Client::with_setting`].
120 ///
121 /// An empty iterator may be passed to clear the set roles.
122 ///
123 /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
124 ///
125 /// # Panics
126 /// If called after the request is started, e.g., after [`Insert::write`].
127 pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
128 self.insert.expect_client_mut().set_roles(roles);
129 self
130 }
131
132 /// Clear any explicit [roles] previously set on this `Insert` or inherited from [`Client`].
133 ///
134 /// Overrides any roles previously set by [`Insert::with_roles`], [`Insert::with_setting`],
135 /// [`Client::with_roles`] or [`Client::with_setting`].
136 ///
137 /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
138 ///
139 /// # Panics
140 /// If called after the request is started, e.g., after [`Insert::write`].
141 pub fn with_default_roles(mut self) -> Self {
142 self.insert.expect_client_mut().clear_roles();
143 self
144 }
145
146 /// Similar to [`Client::with_option`], but for this particular INSERT
147 /// statement only.
148 ///
149 /// # Panics
150 /// If called after the request is started, e.g., after [`Insert::write`].
151 #[track_caller]
152 #[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
153 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
154 self.insert.expect_client_mut().set_setting(name, value);
155 self
156 }
157
158 /// Similar to [`Client::with_setting`], but for this particular INSERT
159 /// statement only.
160 ///
161 /// # Panics
162 /// If called after the request is started, e.g., after [`Insert::write`].
163 #[track_caller]
164 pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
165 self.insert.expect_client_mut().set_setting(name, value);
166 self
167 }
168
169 pub(crate) fn set_timeouts(
170 &mut self,
171 send_timeout: Option<Duration>,
172 end_timeout: Option<Duration>,
173 ) {
174 self.insert.set_timeouts(send_timeout, end_timeout);
175 }
176
177 /// Serializes the provided row into an internal buffer.
178 /// Once the buffer is full, it's sent to a background task writing to the
179 /// socket.
180 ///
181 /// Close to:
182 ///
183 /// ```ignore
184 /// async fn write<T>(&self, row: &T) -> Result<usize>;
185 /// ```
186 ///
187 /// A returned future doesn't depend on the row's lifetime.
188 ///
189 /// Returns an error if the row cannot be serialized or the background task
190 /// failed. Once failed, the whole `INSERT` is aborted and cannot be
191 /// used anymore.
192 ///
193 /// # Panics
194 ///
195 /// If called after the previous call that returned an error.
196 pub fn write<'a>(
197 &'a mut self,
198 row: &T::Value<'_>,
199 ) -> impl Future<Output = Result<()>> + 'a + Send
200 where
201 T: RowWrite,
202 {
203 let result = self.do_write(row);
204
205 async move {
206 result?;
207 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
208 self.insert.flush().await?;
209 }
210
211 self.sent_rows += 1;
212
213 Ok(())
214 }
215 }
216
217 /// Writes a row of any type `R` into this insert.
218 ///
219 /// Unlike [`Insert::write`], the row type `R` is inferred directly from the
220 /// argument, no turbofish needed:
221 /// ```ignore
222 /// insert.write_any(&MyRow { ... }).await?;
223 /// ```
224 ///
225 /// This is the primary write method for type-erased [`Insert<()>`] obtained
226 /// from [`Client::insert_any`].
227 pub fn write_any<'a, R>(&'a mut self, row: &R) -> impl Future<Output = Result<()>> + 'a + Send
228 where
229 R: Row + Serialize,
230 {
231 let result = self.do_write_any(row);
232
233 async move {
234 result?;
235 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
236 self.insert.flush().await?;
237 }
238 Ok(())
239 }
240 }
241
242 /// Returns the number of bytes written, not including the RBWNAT header.
243 #[inline(always)]
244 pub(crate) fn do_write(&mut self, row: &T::Value<'_>) -> Result<usize>
245 where
246 T: RowWrite,
247 {
248 self.do_write_any(row)
249 }
250
251 #[inline(always)]
252 pub(crate) fn do_write_any<R>(&mut self, row: &R) -> Result<usize>
253 where
254 R: Row + Serialize,
255 {
256 // We don't want to wait for the buffer to be full before we start the request,
257 // in the event of an error.
258 self.init_request_if_required()?;
259
260 // The following calls need an `impl BufMut`
261 let buffer = self.insert.buffer_mut();
262
263 let old_buf_size = buffer.len();
264 let result = match &self.row_metadata {
265 Some(metadata) => serialize_with_validation(&mut *buffer, row, metadata),
266 None => serialize_row_binary(&mut *buffer, row),
267 };
268 let written = buffer.len() - old_buf_size;
269
270 if let Err(e) = &result {
271 e.record_in_current_span("error serializing row");
272 self.abort();
273 }
274
275 result.and(Ok(written))
276 }
277
278 /// Ends `INSERT`, the server starts processing the data.
279 ///
280 /// Succeeds if the server returns 200, that means the `INSERT` was handled
281 /// successfully, including all materialized views and quorum writes.
282 ///
283 /// NOTE: If it isn't called, the whole `INSERT` is aborted.
284 pub async fn end(mut self) -> Result<()> {
285 // `InsertFormatted::end()` will add `sent_bytes` and `encoded_bytes` to the span.
286 tracing::record_all!(
287 self.insert.span(),
288 clickhouse.request.sent_rows = self.sent_rows.0,
289 );
290
291 self.insert.end().await
292 }
293
294 fn init_request_if_required(&mut self) -> Result<()> {
295 let fresh_request = self.insert.init_request_if_required()?;
296
297 if fresh_request && let Some(metadata) = &self.row_metadata {
298 put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
299 .inspect_err(|_| self.abort())?;
300 }
301
302 Ok(())
303 }
304
305 fn abort(&mut self) {
306 self.insert.abort();
307 }
308}