use crate::insert_formatted::{BufInsertFormatted, InsertFormatted};
use crate::row_metadata::RowMetadata;
use crate::rowbinary::{serialize_row_binary, serialize_with_validation};
use crate::{
Client, RowWrite,
error::Result,
formats,
row::{self, Row},
};
use clickhouse_types::put_rbwnat_columns_header;
use std::num::Saturating;
use std::{future::Future, marker::PhantomData, time::Duration};
const BUFFER_SIZE: usize = 256 * 1024;
const MIN_CHUNK_SIZE: usize = const {
assert!(BUFFER_SIZE.is_power_of_two());
BUFFER_SIZE - 2048
};
#[must_use]
pub struct Insert<T> {
insert: BufInsertFormatted,
row_metadata: Option<RowMetadata>,
sent_rows: Saturating<u64>,
_marker: PhantomData<fn() -> T>, }
impl<T> Insert<T> {
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
where
T: Row,
{
let fields = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");
let format = if row_metadata.is_some() {
formats::ROW_BINARY_WITH_NAMES_AND_TYPES
} else {
formats::ROW_BINARY
};
let sql = format!("INSERT INTO {table}({fields}) FORMAT {format}");
Self {
insert: InsertFormatted::new(client, sql, Some(table))
.buffered_with_capacity(BUFFER_SIZE),
row_metadata,
sent_rows: Saturating(0),
_marker: PhantomData,
}
}
pub fn with_timeouts(
mut self,
send_timeout: Option<Duration>,
end_timeout: Option<Duration>,
) -> Self {
self.set_timeouts(send_timeout, end_timeout);
self
}
pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.insert.expect_client_mut().set_roles(roles);
self
}
pub fn with_default_roles(mut self) -> Self {
self.insert.expect_client_mut().clear_roles();
self
}
#[track_caller]
#[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.insert.expect_client_mut().set_setting(name, value);
self
}
#[track_caller]
pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.insert.expect_client_mut().set_setting(name, value);
self
}
pub(crate) fn set_timeouts(
&mut self,
send_timeout: Option<Duration>,
end_timeout: Option<Duration>,
) {
self.insert.set_timeouts(send_timeout, end_timeout);
}
pub fn write<'a>(
&'a mut self,
row: &T::Value<'_>,
) -> impl Future<Output = Result<()>> + 'a + Send
where
T: RowWrite,
{
let result = self.do_write(row);
async move {
result?;
if self.insert.buf_len() >= MIN_CHUNK_SIZE {
self.insert.flush().await?;
}
self.sent_rows += 1;
Ok(())
}
}
#[inline(always)]
pub(crate) fn do_write(&mut self, row: &T::Value<'_>) -> Result<usize>
where
T: RowWrite,
{
self.init_request_if_required()?;
let buffer = self.insert.buffer_mut();
let old_buf_size = buffer.len();
let result = match &self.row_metadata {
Some(metadata) => serialize_with_validation(&mut *buffer, row, metadata),
None => serialize_row_binary(&mut *buffer, row),
};
let written = buffer.len() - old_buf_size;
if let Err(e) = &result {
e.record_in_current_span("error serializing row");
self.abort();
}
result.and(Ok(written))
}
pub async fn end(mut self) -> Result<()> {
tracing::record_all!(
self.insert.span(),
clickhouse.request.sent_rows = self.sent_rows.0,
);
self.insert.end().await
}
fn init_request_if_required(&mut self) -> Result<()> {
let fresh_request = self.insert.init_request_if_required()?;
if fresh_request && let Some(metadata) = &self.row_metadata {
put_rbwnat_columns_header(&metadata.columns, self.insert.buffer_mut())
.inspect_err(|_| self.abort())?;
}
Ok(())
}
fn abort(&mut self) {
self.insert.abort();
}
}