Skip to main content

clickhouse/
insert_data_row.rs

1use crate::{
2    AccessType, Client, Result, data_row::DataRow, error::Error, formats,
3    insert_formatted::BufInsertFormatted, row_metadata::RowMetadata, rowbinary::serialize_data_row,
4};
5use clickhouse_types::{Column, put_rbwnat_columns_header};
6use std::{future::Future, time::Duration};
7
8// Match the buffer constants used by Insert<T>.
9const BUFFER_SIZE: usize = 256 * 1024;
10const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 2048;
11
12/// An INSERT specialised for [`DataRow`] values.
13///
14/// Obtained via [`crate::Client::insert_data_row`].
15///
16/// When the client has validation enabled (the default), column types are fetched
17/// from the server schema and used to correctly encode Nullable columns,
18/// Date/DateTime/UUID values, and so on.
19///
20/// Call [`DataRowInsert::end`] to finalise the INSERT.
21#[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
22#[must_use]
23pub struct DataRowInsert {
24    insert: BufInsertFormatted,
25    /// Schema metadata used to write the RowBinaryWithNamesAndTypes header on the
26    /// first write.  `None` when validation is disabled.
27    row_metadata: Option<RowMetadata>,
28    /// Schema columns in DataRow column order, used for type-guided serialization.
29    /// `None` when validation is disabled (best-effort RowBinary encoding).
30    columns: Option<Box<[Column]>>,
31}
32
33impl Client {
34    /// Starts an INSERT for dynamically-typed [`DataRow`] values.
35    ///
36    /// `proto` is used to determine column names; all rows passed to
37    /// [`DataRowInsert::write_row`] must have values in the same column order.
38    ///
39    /// When validation is enabled (the default), the table schema is fetched once
40    /// and cached. This enables correct encoding of Nullable columns,
41    /// Date/DateTime/UUID types, and schema mismatch detection.
42    ///
43    /// When validation is disabled, a best-effort RowBinary encoding is used that
44    /// works correctly only for non-nullable primitive columns.
45    #[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
46    pub async fn insert_data_row(&self, table: &str, proto: &DataRow) -> Result<DataRowInsert> {
47        // Build escaped field list from proto column names.
48        let fields: String =
49            proto
50                .column_names
51                .iter()
52                .enumerate()
53                .fold(String::new(), |mut s, (i, col)| {
54                    if i > 0 {
55                        s.push(',');
56                    }
57                    crate::sql::escape::identifier(col.as_ref(), &mut s).expect("impossible");
58                    s
59                });
60
61        if self.get_validation() {
62            let meta = self.get_insert_metadata(table).await?;
63
64            // Resolve each DataRow column to its schema column (in DataRow order).
65            let mut ordered_columns: Vec<Column> = Vec::with_capacity(proto.column_names.len());
66            for col_name in proto.column_names.iter() {
67                let idx = meta
68                    .column_lookup
69                    .get(col_name.as_ref())
70                    .copied()
71                    .ok_or_else(|| {
72                        Error::SchemaMismatch(format!(
73                            "insert_data_row: column '{col_name}' not found in table '{table}'"
74                        ))
75                    })?;
76                if meta.column_default_kinds[idx].is_immutable() {
77                    return Err(Error::SchemaMismatch(format!(
78                        "insert_data_row: column '{col_name}' is immutable ({})",
79                        meta.column_default_kinds[idx],
80                    )));
81                }
82                ordered_columns.push(meta.row_metadata.columns[idx].clone());
83            }
84
85            let row_metadata = RowMetadata {
86                columns: ordered_columns.clone(),
87                access_type: AccessType::WithSeqAccess,
88            };
89
90            let sql = format!(
91                "INSERT INTO {table}({fields}) FORMAT {}",
92                formats::ROW_BINARY_WITH_NAMES_AND_TYPES
93            );
94            Ok(DataRowInsert::new(
95                self,
96                sql,
97                Some(row_metadata),
98                Some(ordered_columns.into_boxed_slice()),
99            ))
100        } else {
101            let sql = format!(
102                "INSERT INTO {table}({fields}) FORMAT {}",
103                formats::ROW_BINARY
104            );
105            Ok(DataRowInsert::new(self, sql, None, None))
106        }
107    }
108}
109
110impl DataRowInsert {
111    /// Creates a new `DataRowInsert`.
112    ///
113    /// * `sql`           – complete `INSERT INTO … FORMAT …` statement.
114    /// * `row_metadata`  – schema metadata for writing the RBWNAT header; `None` for plain RowBinary.
115    /// * `columns`       – column definitions in DataRow order for type-guided serialisation.
116    pub(crate) fn new(
117        client: &Client,
118        sql: String,
119        row_metadata: Option<RowMetadata>,
120        columns: Option<Box<[Column]>>,
121    ) -> Self {
122        Self {
123            insert: client
124                .insert_formatted_with(sql)
125                .buffered_with_capacity(BUFFER_SIZE),
126            row_metadata,
127            columns,
128        }
129    }
130
131    /// Sets send/end timeouts; see [`crate::insert::Insert::with_timeouts`] for details.
132    pub fn with_timeouts(
133        mut self,
134        send_timeout: Option<Duration>,
135        end_timeout: Option<Duration>,
136    ) -> Self {
137        self.insert.set_timeouts(send_timeout, end_timeout);
138        self
139    }
140
141    /// Serializes `row` and appends it to the INSERT buffer.
142    ///
143    /// Flushes the buffer to the network when it exceeds the internal chunk size.
144    pub fn write_row<'a>(
145        &'a mut self,
146        row: &'a DataRow,
147    ) -> impl Future<Output = Result<()>> + 'a + Send {
148        let result = self.do_write_row(row);
149        async move {
150            result?;
151            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
152                self.insert.flush().await?;
153            }
154            Ok(())
155        }
156    }
157
158    /// Finalises the INSERT, causing the server to process all buffered data.
159    pub async fn end(mut self) -> Result<()> {
160        self.insert.end().await
161    }
162
163    fn do_write_row(&mut self, row: &DataRow) -> Result<()> {
164        let fresh = self.insert.init_request_if_required()?;
165        if fresh {
166            if let Some(metadata) = &self.row_metadata {
167                put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
168                    .inspect_err(|_| self.insert.abort())?;
169            }
170        }
171
172        let result = serialize_data_row(self.insert.buffer_mut(), row, self.columns.as_deref());
173        if result.is_err() {
174            self.insert.abort();
175        }
176        result
177    }
178}
179
180#[cfg(feature = "arrow")]
181impl Client {
182    /// Starts an INSERT for an Arrow [`RecordBatch`].
183    ///
184    /// Column names are taken from the batch schema. All batches passed to
185    /// [`DataRowInsert::write_batch`] must have a matching schema.
186    ///
187    /// Internally builds a prototype [`DataRow`] from the schema and delegates
188    /// to [`Client::insert_data_row`], so validation and typed encoding apply
189    /// in the same way.
190    #[cfg_attr(docsrs, doc(cfg(feature = "arrow")))]
191    pub async fn insert_arrow(
192        &self,
193        table: &str,
194        schema: &sea_orm_arrow::arrow::datatypes::Schema,
195    ) -> Result<DataRowInsert> {
196        use std::sync::Arc;
197
198        let column_names: Arc<[Arc<str>]> = schema
199            .fields()
200            .iter()
201            .map(|f| Arc::from(f.name().as_str()))
202            .collect();
203        let proto = DataRow {
204            column_names,
205            column_types: Arc::from([]),
206            values: vec![],
207        };
208        self.insert_data_row(table, &proto).await
209    }
210}
211
212#[cfg(feature = "arrow")]
213impl DataRowInsert {
214    /// Serializes every row in `batch` and appends them to the INSERT buffer.
215    ///
216    /// Each Arrow column element is converted to a [`sea_query::Value`] via
217    /// [`sea_orm_arrow::arrow_array_to_value`], then written as a [`DataRow`].
218    /// The buffer is flushed to the network whenever it exceeds the internal
219    /// chunk size, so large batches stream incrementally.
220    #[cfg_attr(docsrs, doc(cfg(feature = "arrow")))]
221    pub async fn write_batch(
222        &mut self,
223        batch: &sea_orm_arrow::arrow::array::RecordBatch,
224    ) -> Result<()> {
225        use std::sync::Arc;
226
227        let column_names: Arc<[Arc<str>]> = batch
228            .schema()
229            .fields()
230            .iter()
231            .map(|f| Arc::from(f.name().as_str()))
232            .collect();
233
234        let schema = batch.schema();
235        let arrow_columns = batch.columns();
236
237        for row in 0..batch.num_rows() {
238            let values = arrow_columns
239                .iter()
240                .zip(schema.fields())
241                .map(|(col, field)| {
242                    crate::arrow::value::element_to_value(col.as_ref(), field.data_type(), row)
243                })
244                .collect::<Result<Vec<_>>>()?;
245
246            let data_row = DataRow {
247                column_names: column_names.clone(),
248                column_types: Arc::from([]),
249                values,
250            };
251            self.do_write_row(&data_row)?;
252
253            if self.insert.buf_len() >= MIN_CHUNK_SIZE {
254                self.insert.flush().await?;
255            }
256        }
257        Ok(())
258    }
259}