use super::{
AttributeDefinition, BillingMode, Connection, GlobalSecondaryIndex, Projection, ProjectionType,
Result, Table, TypeExt, db, ddb_key_schema,
};
impl Connection {
pub(crate) async fn create_table(
&mut self,
schema: &db::Schema,
table: &Table,
reset: bool,
) -> Result<()> {
if reset {
let _ = self
.client
.delete_table()
.table_name(&table.name)
.send()
.await;
for index in &table.indices {
if !index.primary_key && index.unique {
let _ = self
.client
.delete_table()
.table_name(&index.name)
.send()
.await;
}
}
}
let mut defined_attributes = hashbrown::HashSet::new();
let pk_cols: Vec<&db::Column> = table.primary_key_columns().collect();
if pk_cols.is_empty() || pk_cols.len() > 2 {
return Err(toasty_core::Error::invalid_schema(format!(
"table '{}' primary key must have 1 or 2 columns, got {}",
table.name,
pk_cols.len()
)));
}
for col in &pk_cols {
defined_attributes.insert(col.id);
}
let pk_partition_cols = &pk_cols[..1];
let pk_range_cols = if pk_cols.len() > 1 {
&pk_cols[1..]
} else {
&[][..]
};
let mut gsis = vec![];
for index in &table.indices {
if index.primary_key || index.unique {
continue;
}
let partition_cols: Vec<&db::Column> = index
.columns
.iter()
.filter(|ic| ic.scope.is_partition())
.map(|ic| table.column(ic.column))
.collect();
let range_cols: Vec<&db::Column> = index
.columns
.iter()
.filter(|ic| ic.scope.is_local())
.map(|ic| table.column(ic.column))
.collect();
if partition_cols.is_empty() || partition_cols.len() > 4 {
return Err(toasty_core::Error::invalid_schema(format!(
"GSI '{}' must have 1 to 4 partition (HASH) columns, got {}",
index.name,
partition_cols.len()
)));
}
if range_cols.len() > 4 {
return Err(toasty_core::Error::invalid_schema(format!(
"GSI '{}' must have at most 4 range (RANGE) columns, got {}",
index.name,
range_cols.len()
)));
}
for col in &partition_cols {
defined_attributes.insert(col.id);
}
for col in &range_cols {
defined_attributes.insert(col.id);
}
gsis.push(
GlobalSecondaryIndex::builder()
.index_name(&index.name)
.set_key_schema(Some(ddb_key_schema(&partition_cols, &range_cols)))
.projection(
Projection::builder()
.projection_type(ProjectionType::All)
.build(),
)
.build()
.unwrap(),
);
}
let attribute_definitions = defined_attributes
.iter()
.map(|column_id| {
let column = table.column(*column_id);
let ty = column.ty.to_ddb_type();
AttributeDefinition::builder()
.attribute_name(&column.name)
.attribute_type(ty)
.build()
.unwrap()
})
.collect();
self.client
.create_table()
.table_name(&table.name)
.set_attribute_definitions(Some(attribute_definitions))
.set_key_schema(Some(ddb_key_schema(pk_partition_cols, pk_range_cols)))
.set_global_secondary_indexes(if gsis.is_empty() { None } else { Some(gsis) })
.billing_mode(BillingMode::PayPerRequest)
.send()
.await
.map_err(toasty_core::Error::driver_operation_failed)?;
for index in table.indices.iter().filter(|i| !i.primary_key && i.unique) {
assert_eq!(1, index.columns.len());
let pk = schema.column(index.columns[0].column);
self.client
.create_table()
.table_name(&index.name)
.set_key_schema(Some(ddb_key_schema(&[pk], &[])))
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name(&pk.name)
.attribute_type(pk.ty.to_ddb_type())
.build()
.unwrap(),
)
.billing_mode(BillingMode::PayPerRequest)
.send()
.await
.map_err(toasty_core::Error::driver_operation_failed)?;
}
Ok(())
}
}