use crate::Row;
use crate::error::Error;
use crate::error::Result;
use crate::row::RowKind;
use clickhouse_types::Column;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
#[derive(Debug, PartialEq)]
pub(crate) enum AccessType {
WithSeqAccess,
WithMapAccess(Vec<usize>),
}
pub(crate) struct RowMetadata {
pub(crate) columns: Vec<Column>,
pub(crate) access_type: AccessType,
}
pub(crate) struct InsertMetadata {
pub(crate) row_metadata: RowMetadata,
pub(crate) column_default_kinds: Vec<ColumnDefaultKind>,
pub(crate) column_lookup: HashMap<String, usize>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(crate) enum ColumnDefaultKind {
Unset,
Default,
Materialized,
Ephemeral,
Alias,
}
impl RowMetadata {
pub(crate) fn new_for_cursor<T: Row>(columns: Vec<Column>) -> Result<Self> {
let access_type = match T::KIND {
RowKind::Primitive => {
if columns.len() != 1 {
return Err(Error::SchemaMismatch(format!(
"While processing a primitive row: \
expected only 1 column in the database schema, \
but got {} instead.\n#### All schema columns:\n{}",
columns.len(),
join_panic_schema_hint(&columns),
)));
}
AccessType::WithSeqAccess }
RowKind::Tuple => {
if T::COLUMN_COUNT != columns.len() {
return Err(Error::SchemaMismatch(format!(
"While processing a tuple row: database schema has {} columns, \
but the tuple definition has {} fields in total.\
\n#### All schema columns:\n{}",
columns.len(),
T::COLUMN_COUNT,
join_panic_schema_hint(&columns),
)));
}
AccessType::WithSeqAccess }
RowKind::Vec => {
if columns.len() != 1 {
return Err(Error::SchemaMismatch(format!(
"While processing a row defined as a vector: \
expected only 1 column in the database schema, \
but got {} instead.\n#### All schema columns:\n{}",
columns.len(),
join_panic_schema_hint(&columns),
)));
}
AccessType::WithSeqAccess }
RowKind::Struct => {
if columns.len() != T::COLUMN_NAMES.len() {
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has {} columns, \
but the struct definition has {} fields.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
columns.len(),
T::COLUMN_NAMES.len(),
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&columns),
)));
}
let mut mapping = Vec::with_capacity(T::COLUMN_NAMES.len());
let mut expected_index = 0;
let mut should_use_map = false;
for col in &columns {
if let Some(index) = T::COLUMN_NAMES.iter().position(|field| col.name == *field)
{
if index != expected_index {
should_use_map = true
}
expected_index += 1;
mapping.push(index);
} else {
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has a column {col} \
that was not found in the struct definition.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&columns),
)));
}
}
if should_use_map {
AccessType::WithMapAccess(mapping)
} else {
AccessType::WithSeqAccess
}
}
};
Ok(Self {
columns,
access_type,
})
}
#[inline]
pub(crate) fn get_schema_index(&self, struct_idx: usize) -> Result<usize> {
match &self.access_type {
AccessType::WithMapAccess(mapping) => {
if struct_idx < mapping.len() {
Ok(mapping[struct_idx])
} else {
Err(Error::SchemaMismatch(
"Struct has more fields than columns in the database schema".to_string(),
))
}
}
AccessType::WithSeqAccess => Ok(struct_idx), }
}
#[inline]
pub(crate) fn is_field_order_wrong(&self) -> bool {
matches!(self.access_type, AccessType::WithMapAccess(_))
}
}
impl FromStr for ColumnDefaultKind {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"" => Ok(Self::Unset),
"DEFAULT" => Ok(Self::Default),
"MATERIALIZED" => Ok(Self::Materialized),
"EPHEMERAL" => Ok(Self::Ephemeral),
"ALIAS" => Ok(Self::Alias),
other => Err(Error::Other(
format!("unknown column default_kind {other}").into(),
)),
}
}
}
impl ColumnDefaultKind {
pub(crate) fn is_immutable(self) -> bool {
matches!(self, Self::Materialized | Self::Alias)
}
pub(crate) fn has_default(self) -> bool {
matches!(self, Self::Default | Self::Materialized | Self::Alias)
}
pub(crate) fn to_str(self) -> &'static str {
match self {
ColumnDefaultKind::Unset => "",
ColumnDefaultKind::Default => "DEFAULT",
ColumnDefaultKind::Materialized => "MATERIALIZED",
ColumnDefaultKind::Ephemeral => "EPHEMERAL",
ColumnDefaultKind::Alias => "ALIAS",
}
}
}
impl Display for ColumnDefaultKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.to_str())
}
}
impl InsertMetadata {
pub(crate) fn to_row<T: Row>(&self) -> Result<RowMetadata> {
if T::KIND != RowKind::Struct {
return Err(Error::SchemaMismatch(format!(
"SerializerRowMetadata can only be created for structs, \
but got {:?} instead.\n#### All schema columns:\n{}",
T::KIND,
join_panic_schema_hint(&self.row_metadata.columns),
)));
}
let mut result_columns: Vec<Column> = Vec::with_capacity(T::COLUMN_COUNT);
let mut set_columns: Vec<bool> = vec![false; self.row_metadata.columns.len()];
for struct_column_name in T::COLUMN_NAMES {
match self.column_lookup.get(*struct_column_name) {
Some(&col) => {
if self.column_default_kinds[col].is_immutable() {
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: column {struct_column_name} is immutable (declared as `{}`)",
T::NAME,
self.column_default_kinds[col],
)));
}
set_columns[col] = true;
result_columns.push(self.row_metadata.columns[col].clone())
}
None => {
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: database schema has no column named {struct_column_name}.\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&self.row_metadata.columns),
)));
}
}
}
let missing_columns = set_columns.iter().enumerate().filter_map(|(col, &is_set)| {
if is_set || self.column_default_kinds[col].has_default() {
return None;
}
Some(&self.row_metadata.columns[col])
});
let missing_columns_hint = join_panic_schema_hint(missing_columns);
if !missing_columns_hint.is_empty() {
return Err(Error::SchemaMismatch(format!(
"While processing struct {}: the following non-default columns are missing:\n{missing_columns_hint}\
\n#### All struct fields:\n{}\n#### All schema columns:\n{}",
T::NAME,
join_panic_schema_hint(T::COLUMN_NAMES),
join_panic_schema_hint(&self.row_metadata.columns),
)));
}
Ok(RowMetadata {
columns: result_columns,
access_type: AccessType::WithSeqAccess, })
}
}
fn join_panic_schema_hint<T: Display>(col: impl IntoIterator<Item = T>) -> String {
col.into_iter()
.map(|c| format!("- {c}"))
.collect::<Vec<String>>()
.join("\n")
}