clickhouse/
insert_data_row.rs1use crate::{
2 AccessType, Client, Result, data_row::DataRow, error::Error, formats,
3 insert_formatted::BufInsertFormatted, row_metadata::RowMetadata, rowbinary::serialize_data_row,
4};
5use clickhouse_types::{Column, put_rbwnat_columns_header};
6use std::{future::Future, time::Duration};
7
8const BUFFER_SIZE: usize = 256 * 1024;
10const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 2048;
11
12#[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
22#[must_use]
23pub struct DataRowInsert {
24 insert: BufInsertFormatted,
25 row_metadata: Option<RowMetadata>,
28 columns: Option<Box<[Column]>>,
31}
32
33impl Client {
34 #[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
46 pub async fn insert_data_row(&self, table: &str, proto: &DataRow) -> Result<DataRowInsert> {
47 let fields: String =
49 proto
50 .column_names
51 .iter()
52 .enumerate()
53 .fold(String::new(), |mut s, (i, col)| {
54 if i > 0 {
55 s.push(',');
56 }
57 crate::sql::escape::identifier(col.as_ref(), &mut s).expect("impossible");
58 s
59 });
60
61 if self.get_validation() {
62 let meta = self.get_insert_metadata(table).await?;
63
64 let mut ordered_columns: Vec<Column> = Vec::with_capacity(proto.column_names.len());
66 for col_name in proto.column_names.iter() {
67 let idx = meta
68 .column_lookup
69 .get(col_name.as_ref())
70 .copied()
71 .ok_or_else(|| {
72 Error::SchemaMismatch(format!(
73 "insert_data_row: column '{col_name}' not found in table '{table}'"
74 ))
75 })?;
76 if meta.column_default_kinds[idx].is_immutable() {
77 return Err(Error::SchemaMismatch(format!(
78 "insert_data_row: column '{col_name}' is immutable ({})",
79 meta.column_default_kinds[idx],
80 )));
81 }
82 ordered_columns.push(meta.row_metadata.columns[idx].clone());
83 }
84
85 let row_metadata = RowMetadata {
86 columns: ordered_columns.clone(),
87 access_type: AccessType::WithSeqAccess,
88 };
89
90 let sql = format!(
91 "INSERT INTO {table}({fields}) FORMAT {}",
92 formats::ROW_BINARY_WITH_NAMES_AND_TYPES
93 );
94 Ok(DataRowInsert::new(
95 self,
96 sql,
97 Some(row_metadata),
98 Some(ordered_columns.into_boxed_slice()),
99 ))
100 } else {
101 let sql = format!(
102 "INSERT INTO {table}({fields}) FORMAT {}",
103 formats::ROW_BINARY
104 );
105 Ok(DataRowInsert::new(self, sql, None, None))
106 }
107 }
108}
109
110impl DataRowInsert {
111 pub(crate) fn new(
117 client: &Client,
118 sql: String,
119 row_metadata: Option<RowMetadata>,
120 columns: Option<Box<[Column]>>,
121 ) -> Self {
122 Self {
123 insert: client
124 .insert_formatted_with(sql)
125 .buffered_with_capacity(BUFFER_SIZE),
126 row_metadata,
127 columns,
128 }
129 }
130
131 pub fn with_timeouts(
133 mut self,
134 send_timeout: Option<Duration>,
135 end_timeout: Option<Duration>,
136 ) -> Self {
137 self.insert.set_timeouts(send_timeout, end_timeout);
138 self
139 }
140
141 pub fn write_row<'a>(
145 &'a mut self,
146 row: &'a DataRow,
147 ) -> impl Future<Output = Result<()>> + 'a + Send {
148 let result = self.do_write_row(row);
149 async move {
150 result?;
151 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
152 self.insert.flush().await?;
153 }
154 Ok(())
155 }
156 }
157
158 pub async fn end(mut self) -> Result<()> {
160 self.insert.end().await
161 }
162
163 fn do_write_row(&mut self, row: &DataRow) -> Result<()> {
164 let fresh = self.insert.init_request_if_required()?;
165 if fresh {
166 if let Some(metadata) = &self.row_metadata {
167 put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
168 .inspect_err(|_| self.insert.abort())?;
169 }
170 }
171
172 let result = serialize_data_row(self.insert.buffer_mut(), row, self.columns.as_deref());
173 if result.is_err() {
174 self.insert.abort();
175 }
176 result
177 }
178}
179
180#[cfg(feature = "arrow")]
181impl Client {
182 #[cfg_attr(docsrs, doc(cfg(feature = "arrow")))]
191 pub async fn insert_arrow(
192 &self,
193 table: &str,
194 schema: &sea_orm_arrow::arrow::datatypes::Schema,
195 ) -> Result<DataRowInsert> {
196 use std::sync::Arc;
197
198 let column_names: Arc<[Arc<str>]> = schema
199 .fields()
200 .iter()
201 .map(|f| Arc::from(f.name().as_str()))
202 .collect();
203 let proto = DataRow {
204 column_names,
205 column_types: Arc::from([]),
206 values: vec![],
207 };
208 self.insert_data_row(table, &proto).await
209 }
210}
211
212#[cfg(feature = "arrow")]
213impl DataRowInsert {
214 #[cfg_attr(docsrs, doc(cfg(feature = "arrow")))]
221 pub async fn write_batch(
222 &mut self,
223 batch: &sea_orm_arrow::arrow::array::RecordBatch,
224 ) -> Result<()> {
225 use std::sync::Arc;
226
227 let column_names: Arc<[Arc<str>]> = batch
228 .schema()
229 .fields()
230 .iter()
231 .map(|f| Arc::from(f.name().as_str()))
232 .collect();
233
234 let schema = batch.schema();
235 let arrow_columns = batch.columns();
236
237 for row in 0..batch.num_rows() {
238 let values = arrow_columns
239 .iter()
240 .zip(schema.fields())
241 .map(|(col, field)| {
242 crate::arrow::value::element_to_value(col.as_ref(), field.data_type(), row)
243 })
244 .collect::<Result<Vec<_>>>()?;
245
246 let data_row = DataRow {
247 column_names: column_names.clone(),
248 column_types: Arc::from([]),
249 values,
250 };
251 self.do_write_row(&data_row)?;
252
253 if self.insert.buf_len() >= MIN_CHUNK_SIZE {
254 self.insert.flush().await?;
255 }
256 }
257 Ok(())
258 }
259}