use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::store::Store;
use serde_json::Value;
pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
let resource_arn = data[constants::RESOURCE_ARN].as_str().unwrap_or("");
if resource_arn.is_empty() {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some("ResourceARN is required."),
));
}
let tag_keys: Vec<String> = data[constants::TAG_KEYS]
.as_array()
.ok_or_else(|| {
KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some("TagKeys is required."),
)
})?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
if resource_arn.contains(":stream/")
&& !resource_arn.contains("/consumer/")
&& let Some(stream_name) = store.stream_name_from_arn(resource_arn)
{
store
.update_stream(&stream_name, |stream| {
for key in &tag_keys {
stream.tags.remove(key);
}
Ok(())
})
.await?;
tracing::trace!(resource_arn, tags = tag_keys.len(), "resource untagged");
return Ok(None);
}
let mut existing = store.get_resource_tags(resource_arn).await;
for key in &tag_keys {
existing.remove(key);
}
store.put_resource_tags(resource_arn, &existing).await?;
tracing::trace!(resource_arn, tags = tag_keys.len(), "resource untagged");
Ok(None)
}