clickhouse/insert.rs
1use crate::insert_formatted::BufInsertFormatted;
2use crate::row_metadata::RowMetadata;
3use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
4use crate::{
5 Client, RowWrite,
6 error::Result,
7 formats,
8 row::{self, Row},
9};
10use clickhouse_types::put_rbwnat_columns_header;
11use serde::Serialize;
12use std::{future::Future, marker::PhantomData, time::Duration};
13
14// The desired max frame size.
15const BUFFER_SIZE: usize = 256 * 1024;
16// Threshold to send a chunk. Should be slightly less than `BUFFER_SIZE`
17// to avoid extra reallocations in case of a big last row.
18const MIN_CHUNK_SIZE: usize = const {
19 // to use the whole buffer's capacity
20 assert!(BUFFER_SIZE.is_power_of_two());
21 BUFFER_SIZE - 2048
22};
23
24/// Performs one `INSERT`.
25///
26/// The [`Insert::end`] must be called to finalize the `INSERT`.
27/// Otherwise, the whole `INSERT` will be aborted.
28///
29/// Rows are sent progressively to spread network load.
30///
31/// # Note: Metadata is Cached
32/// If [validation is enabled][Client::with_validation],
33/// this helper will query the metadata for the target table to learn the column names and types.
34///
35/// To avoid querying this metadata every time, it is cached within the [`Client`].
36///
37/// Any concurrent changes to the table schema may cause insert failures if the metadata
38/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
39/// after any changes to the current database schema.
40#[must_use]
41pub struct Insert<T> {
42 insert: BufInsertFormatted,
43 row_metadata: Option<RowMetadata>,
44 _marker: PhantomData<fn() -> T>, // TODO: test contravariance.
45}
46
47impl<T> Insert<T> {
48 pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
49 where
50 T: Row,
51 {
52 let fields = row::join_column_names::<T>()
53 .expect("the row type must be a struct or a wrapper around it");
54
55 // TODO: what about escaping a table name?
56 // https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
57 let format = if row_metadata.is_some() {
58 formats::ROW_BINARY_WITH_NAMES_AND_TYPES
59 } else {
60 formats::ROW_BINARY
61 };
62 let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
63
64 Self {
65 insert: client
66 .insert_formatted_with(sql)
67 .buffered_with_capacity(BUFFER_SIZE),
68 row_metadata,
69 _marker: PhantomData,
70 }
71 }
72
73 /// Converts this typed insert into a type-erased [`Insert<()>`], preserving any
74 /// row metadata (and thus schema validation) that was fetched for `T`.
75 ///
76 /// This lets you to store `Insert<()>` in a container, for example
77 /// `HashMap<std::any::TypeId, Insert<()>>`.
78 ///
79 /// ```ignore
80 /// let mut insert = client.insert::<MyRow>("table").await?.into_any();
81 /// insert.write_any(&MyRow { ... }).await?;
82 /// insert.end().await?;
83 /// ```
84 pub fn into_any(self) -> Insert<()> {
85 Insert {
86 insert: self.insert,
87 row_metadata: self.row_metadata,
88 _marker: PhantomData,
89 }
90 }
91
92 /// Sets timeouts for different operations.
93 ///
94 /// `send_timeout` restricts time on sending a data chunk to a socket.
95 /// `None` disables the timeout, it's a default.
96 /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
97 ///
98 /// `end_timeout` restricts time on waiting for a response from the CH
99 /// server. Thus, it includes all work needed to handle `INSERT` by the
100 /// CH server, e.g. handling all materialized views and so on.
101 /// `None` disables the timeout, it's a default.
102 /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
103 ///
104 /// These timeouts are much more performant (~x10) than wrapping `write()`
105 /// and `end()` calls into `tokio::time::timeout()`.
106 pub fn with_timeouts(
107 mut self,
108 send_timeout: Option<Duration>,
109 end_timeout: Option<Duration>,
110 ) -> Self {
111 self.set_timeouts(send_timeout, end_timeout);
112 self
113 }
114
115 /// Configure the [roles] to use when executing `INSERT` statements.
116 ///
117 /// Overrides any roles previously set by this method, [`Insert::with_option`],
118 /// [`Client::with_roles`] or [`Client::with_option`].
119 ///
120 /// An empty iterator may be passed to clear the set roles.
121 ///
122 /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
123 ///
124 /// # Panics
125 /// If called after the request is started, e.g., after [`Insert::write`].
126 pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
127 self.insert.expect_client_mut().set_roles(roles);
128 self
129 }
130
131 /// Clear any explicit [roles] previously set on this `Insert` or inherited from [`Client`].
132 ///
133 /// Overrides any roles previously set by [`Insert::with_roles`], [`Insert::with_option`],
134 /// [`Client::with_roles`] or [`Client::with_option`].
135 ///
136 /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
137 ///
138 /// # Panics
139 /// If called after the request is started, e.g., after [`Insert::write`].
140 pub fn with_default_roles(mut self) -> Self {
141 self.insert.expect_client_mut().clear_roles();
142 self
143 }
144
145 /// Similar to [`Client::with_option`], but for this particular INSERT
146 /// statement only.
147 ///
148 /// # Panics
149 /// If called after the request is started, e.g., after [`Insert::write`].
150 #[track_caller]
151 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
152 self.insert.expect_client_mut().set_option(name, value);
153 self
154 }
155
156 pub(crate) fn set_timeouts(
157 &mut self,
158 send_timeout: Option<Duration>,
159 end_timeout: Option<Duration>,
160 ) {
161 self.insert.set_timeouts(send_timeout, end_timeout);
162 }
163
164 /// Serializes the provided row into an internal buffer.
165 /// Once the buffer is full, it's sent to a background task writing to the
166 /// socket.
167 ///
168 /// Close to:
169 ///
170 /// ```ignore
171 /// async fn write<T>(&self, row: &T) -> Result<usize>;
172 /// ```
173 ///
174 /// A returned future doesn't depend on the row's lifetime.
175 ///
176 /// Returns an error if the row cannot be serialized or the background task
177 /// failed. Once failed, the whole `INSERT` is aborted and cannot be
178 /// used anymore.
179 ///
180 /// # Panics
181 ///
182 /// If called after the previous call that returned an error.
183 pub fn write<'a>(
184 &'a mut self,
185 row: &T::Value<'_>,
186 ) -> impl Future<Output = Result<()>> + 'a + Send
187 where
188 T: RowWrite,
189 {
190 let result = self.do_write(row);
191
192 async move {
193 result?;
194 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
195 self.insert.flush().await?;
196 }
197 Ok(())
198 }
199 }
200
201 /// Writes a row of any type `R` into this insert.
202 ///
203 /// Unlike [`Insert::write`], the row type `R` is inferred directly from the
204 /// argument, no turbofish needed:
205 /// ```ignore
206 /// insert.write_any(&MyRow { ... }).await?;
207 /// ```
208 ///
209 /// This is the primary write method for type-erased [`Insert<()>`] obtained
210 /// from [`Client::insert_any`].
211 pub fn write_any<'a, R>(&'a mut self, row: &R) -> impl Future<Output = Result<()>> + 'a + Send
212 where
213 R: Row + Serialize,
214 {
215 let result = self.do_write_any(row);
216
217 async move {
218 result?;
219 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
220 self.insert.flush().await?;
221 }
222 Ok(())
223 }
224 }
225
226 /// Returns the number of bytes written, not including the RBWNAT header.
227 #[inline(always)]
228 pub(crate) fn do_write(&mut self, row: &T::Value<'_>) -> Result<usize>
229 where
230 T: RowWrite,
231 {
232 self.do_write_any(row)
233 }
234
235 #[inline(always)]
236 pub(crate) fn do_write_any<R>(&mut self, row: &R) -> Result<usize>
237 where
238 R: Row + Serialize,
239 {
240 // We don't want to wait for the buffer to be full before we start the request,
241 // in the event of an error.
242 self.init_request_if_required()?;
243
244 // The following calls need an `impl BufMut`
245 let buffer = self.insert.buffer_mut();
246
247 let old_buf_size = buffer.len();
248 let result = match &self.row_metadata {
249 Some(metadata) => serialize_with_validation(&mut *buffer, row, metadata),
250 None => serialize_row_binary(&mut *buffer, row),
251 };
252 let written = buffer.len() - old_buf_size;
253
254 if result.is_err() {
255 self.abort();
256 }
257
258 result.and(Ok(written))
259 }
260
261 /// Ends `INSERT`, the server starts processing the data.
262 ///
263 /// Succeeds if the server returns 200, that means the `INSERT` was handled
264 /// successfully, including all materialized views and quorum writes.
265 ///
266 /// NOTE: If it isn't called, the whole `INSERT` is aborted.
267 pub async fn end(mut self) -> Result<()> {
268 self.insert.end().await
269 }
270
271 fn init_request_if_required(&mut self) -> Result<()> {
272 let fresh_request = self.insert.init_request_if_required()?;
273
274 if fresh_request && let Some(metadata) = &self.row_metadata {
275 put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
276 .inspect_err(|_| self.abort())?;
277 }
278
279 Ok(())
280 }
281
282 fn abort(&mut self) {
283 self.insert.abort();
284 }
285}