use super::*;
impl ResourceProvisioner {
pub(super) fn get_att_dynamodb_table(
&self,
physical_id: &str,
attribute: &str,
) -> Option<String> {
let mut accounts = self.dynamodb_state.write();
let state = accounts.get_or_create(&self.account_id);
let table = state.tables.get(physical_id)?;
match attribute {
"Arn" => Some(table.arn.clone()),
"StreamArn" => table.stream_arn.clone(),
_ => None,
}
}
pub(super) fn create_dynamodb_table(
&self,
resource: &ResourceDefinition,
) -> Result<ProvisionResult, String> {
let props = &resource.properties;
let table_name = props
.get("TableName")
.and_then(|v| v.as_str())
.unwrap_or(&resource.logical_id);
let mut key_schema = Vec::new();
if let Some(ks) = props.get("KeySchema").and_then(|v| v.as_array()) {
for item in ks {
let attr_name = item
.get("AttributeName")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let key_type = item
.get("KeyType")
.and_then(|v| v.as_str())
.unwrap_or("HASH")
.to_string();
key_schema.push(KeySchemaElement {
attribute_name: attr_name,
key_type,
});
}
}
let mut attribute_definitions = Vec::new();
if let Some(defs) = props.get("AttributeDefinitions").and_then(|v| v.as_array()) {
for item in defs {
let attr_name = item
.get("AttributeName")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let attr_type = item
.get("AttributeType")
.and_then(|v| v.as_str())
.unwrap_or("S")
.to_string();
attribute_definitions.push(AttributeDefinition {
attribute_name: attr_name,
attribute_type: attr_type,
});
}
}
let billing_mode = props
.get("BillingMode")
.and_then(|v| v.as_str())
.unwrap_or("PAY_PER_REQUEST")
.to_string();
let provisioned_throughput = if billing_mode == "PROVISIONED" {
if let Some(pt) = props.get("ProvisionedThroughput") {
ProvisionedThroughput {
read_capacity_units: pt
.get("ReadCapacityUnits")
.and_then(|v| v.as_i64())
.unwrap_or(5),
write_capacity_units: pt
.get("WriteCapacityUnits")
.and_then(|v| v.as_i64())
.unwrap_or(5),
}
} else {
ProvisionedThroughput {
read_capacity_units: 5,
write_capacity_units: 5,
}
}
} else {
ProvisionedThroughput {
read_capacity_units: 0,
write_capacity_units: 0,
}
};
let (stream_enabled, stream_view_type) =
if let Some(stream_spec) = props.get("StreamSpecification") {
let view_type = stream_spec
.get("StreamViewType")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let enabled = stream_spec
.get("StreamEnabled")
.and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
.unwrap_or(view_type.is_some());
(enabled, view_type)
} else {
(false, None)
};
let deletion_protection_enabled = props
.get("DeletionProtectionEnabled")
.and_then(|v| v.as_bool().or_else(|| v.as_str().map(|s| s == "true")))
.unwrap_or(false);
let on_demand_throughput = props
.get("OnDemandThroughput")
.map(|odt| OnDemandThroughput {
max_read_request_units: odt
.get("MaxReadRequestUnits")
.and_then(|v| v.as_i64())
.unwrap_or(-1),
max_write_request_units: odt
.get("MaxWriteRequestUnits")
.and_then(|v| v.as_i64())
.unwrap_or(-1),
});
let mut __ddb_mas = self.dynamodb_state.write();
let state = __ddb_mas.get_or_create(&self.account_id);
let arn = format!(
"arn:aws:dynamodb:{}:{}:table/{}",
state.region, state.account_id, table_name
);
let stream_arn = if stream_enabled {
Some(format!(
"{}/stream/{}",
arn,
Utc::now().format("%Y-%m-%dT%H:%M:%S.%3f")
))
} else {
None
};
let stream_arn_attr = stream_arn.clone();
let table = DynamoTable {
name: table_name.to_string(),
arn: arn.clone(),
table_id: Uuid::new_v4().to_string().replace('-', ""),
key_schema,
attribute_definitions,
provisioned_throughput,
items: Vec::new(),
gsi: Vec::new(),
lsi: Vec::new(),
tags: BTreeMap::new(),
created_at: Utc::now(),
status: "ACTIVE".to_string(),
item_count: 0,
size_bytes: 0,
billing_mode,
ttl_attribute: None,
ttl_enabled: false,
resource_policy: None,
pitr_enabled: false,
kinesis_destinations: Vec::new(),
contributor_insights_status: "DISABLED".to_string(),
contributor_insights_counters: BTreeMap::new(),
stream_enabled,
stream_view_type,
stream_arn,
stream_records: Arc::new(RwLock::new(Vec::new())),
sse_type: None,
sse_kms_key_arn: None,
deletion_protection_enabled,
on_demand_throughput,
};
state.tables.insert(table_name.to_string(), table);
let mut result = ProvisionResult::new(arn.clone()).with("Arn", arn);
if let Some(stream_arn_value) = stream_arn_attr {
result = result.with("StreamArn", stream_arn_value);
}
Ok(result)
}
pub(super) fn delete_dynamodb_table(&self, physical_id: &str) -> Result<(), String> {
let mut __ddb_mas = self.dynamodb_state.write();
let state = __ddb_mas.get_or_create(&self.account_id);
let table_name = state
.tables
.iter()
.find(|(_, t)| t.arn == physical_id)
.map(|(name, _)| name.clone());
if let Some(name) = table_name {
state.tables.remove(&name);
}
Ok(())
}
}