use crate::errors::{DynoxideError, Result};
use crate::storage::TableMetadata;
use crate::storage_backend::{IndexWriteOp, StorageBackend};
use crate::types::{GlobalSecondaryIndex, Item, KeyType, ProjectionType};
use std::collections::HashMap;
pub struct IndexDef {
pub index_name: String,
pub pk_attr: String,
pub sk_attr: Option<String>,
pub projection_type: ProjectionType,
pub non_key_attributes: Option<Vec<String>>,
}
pub type GsiDef = IndexDef;
impl IndexDef {
pub fn index_key_strings(&self, item: &Item) -> Option<(String, String)> {
let pk = item.get(&self.pk_attr)?.to_key_string()?;
let sk = match self.sk_attr {
Some(ref sk_attr) => item.get(sk_attr)?.to_key_string()?,
None => String::new(),
};
Some((pk, sk))
}
}
pub fn gsi_to_def(gsi: &GlobalSecondaryIndex) -> Result<GsiDef> {
let pk_attr = gsi
.key_schema
.iter()
.find(|k| k.key_type == KeyType::HASH)
.map(|k| k.attribute_name.clone())
.ok_or_else(|| DynoxideError::InternalServerError("GSI missing HASH key".to_string()))?;
let sk_attr = gsi
.key_schema
.iter()
.find(|k| k.key_type == KeyType::RANGE)
.map(|k| k.attribute_name.clone());
Ok(GsiDef {
index_name: gsi.index_name.clone(),
pk_attr,
sk_attr,
projection_type: gsi.projection.projection_type.clone().unwrap_or_default(),
non_key_attributes: gsi.projection.non_key_attributes.clone(),
})
}
pub fn parse_gsi_defs(meta: &TableMetadata) -> Result<Vec<GsiDef>> {
let gsis: Vec<GlobalSecondaryIndex> = match meta.gsi_definitions.as_ref() {
Some(json) => serde_json::from_str(json)
.map_err(|e| DynoxideError::InternalServerError(format!("Bad GSI JSON: {e}")))?,
None => return Ok(Vec::new()),
};
gsis.iter().map(gsi_to_def).collect()
}
pub fn build_index_item(
item: &Item,
index: &IndexDef,
table_pk: &str,
table_sk: Option<&str>,
) -> Item {
match index.projection_type {
ProjectionType::ALL => item.clone(),
ProjectionType::KEYS_ONLY => {
let mut projected = HashMap::new();
if let Some(v) = item.get(table_pk) {
projected.insert(table_pk.to_string(), v.clone());
}
if let Some(sk) = table_sk {
if let Some(v) = item.get(sk) {
projected.insert(sk.to_string(), v.clone());
}
}
if let Some(v) = item.get(&index.pk_attr) {
projected.insert(index.pk_attr.clone(), v.clone());
}
if let Some(ref sk) = index.sk_attr {
if let Some(v) = item.get(sk) {
projected.insert(sk.clone(), v.clone());
}
}
projected
}
ProjectionType::INCLUDE => {
let mut projected = HashMap::new();
if let Some(v) = item.get(table_pk) {
projected.insert(table_pk.to_string(), v.clone());
}
if let Some(sk) = table_sk {
if let Some(v) = item.get(sk) {
projected.insert(sk.to_string(), v.clone());
}
}
if let Some(v) = item.get(&index.pk_attr) {
projected.insert(index.pk_attr.clone(), v.clone());
}
if let Some(ref sk) = index.sk_attr {
if let Some(v) = item.get(sk) {
projected.insert(sk.clone(), v.clone());
}
}
if let Some(ref attrs) = index.non_key_attributes {
for attr in attrs {
if let Some(v) = item.get(attr) {
projected.insert(attr.clone(), v.clone());
}
}
}
projected
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn maintain_gsis_after_write<S: StorageBackend>(
storage: &S,
table_name: &str,
meta: &TableMetadata,
table_pk_str: &str,
table_sk_str: &str,
item: &Item,
table_pk_attr: &str,
table_sk_attr: Option<&str>,
) -> Result<HashMap<String, f64>> {
let gsi_defs = parse_gsi_defs(meta)?;
let mut gsi_units: HashMap<String, f64> = HashMap::new();
let mut ops: Vec<IndexWriteOp> = Vec::new();
for gsi in &gsi_defs {
ops.push(IndexWriteOp::DeleteGsi {
table_name: table_name.to_string(),
index_name: gsi.index_name.clone(),
table_pk: table_pk_str.to_string(),
table_sk: table_sk_str.to_string(),
});
if let Some((gsi_pk, gsi_sk)) = gsi.index_key_strings(item) {
let projected = build_index_item(item, gsi, table_pk_attr, table_sk_attr);
let projected_size = crate::types::item_size(&projected);
let item_json = serde_json::to_string(&projected)
.map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
ops.push(IndexWriteOp::InsertGsi {
table_name: table_name.to_string(),
index_name: gsi.index_name.clone(),
gsi_pk,
gsi_sk,
table_pk: table_pk_str.to_string(),
table_sk: table_sk_str.to_string(),
item_json,
});
gsi_units.insert(
gsi.index_name.clone(),
crate::types::write_capacity_units(projected_size),
);
}
}
storage.apply_index_writes(&ops).await?;
Ok(gsi_units)
}
pub async fn maintain_gsis_after_delete<S: StorageBackend>(
storage: &S,
table_name: &str,
meta: &TableMetadata,
table_pk_str: &str,
table_sk_str: &str,
) -> Result<HashMap<String, f64>> {
let gsi_defs = parse_gsi_defs(meta)?;
let mut gsi_units: HashMap<String, f64> = HashMap::new();
let mut ops: Vec<IndexWriteOp> = Vec::new();
for gsi in &gsi_defs {
ops.push(IndexWriteOp::DeleteGsi {
table_name: table_name.to_string(),
index_name: gsi.index_name.clone(),
table_pk: table_pk_str.to_string(),
table_sk: table_sk_str.to_string(),
});
gsi_units.insert(gsi.index_name.clone(), 1.0);
}
storage.apply_index_writes(&ops).await?;
Ok(gsi_units)
}
pub fn parse_gsi_key_schema(
meta: &TableMetadata,
index_name: &str,
) -> Result<(String, Option<String>)> {
let gsi_defs = parse_gsi_defs(meta)?;
let gsi = gsi_defs
.into_iter()
.find(|g| g.index_name == index_name)
.ok_or_else(|| {
DynoxideError::ValidationException(format!(
"The table does not have the specified index: {index_name}"
))
})?;
Ok((gsi.pk_attr, gsi.sk_attr))
}