ferrokinesis 0.1.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::store::Store;
use serde_json::Value;

pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
    let resource_arn = data[constants::RESOURCE_ARN].as_str().unwrap_or("");

    if resource_arn.is_empty() {
        return Err(KinesisErrorResponse::client_error(
            constants::INVALID_ARGUMENT,
            Some("ResourceARN is required."),
        ));
    }

    let tag_keys: Vec<String> = data[constants::TAG_KEYS]
        .as_array()
        .ok_or_else(|| {
            KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some("TagKeys is required."),
            )
        })?
        .iter()
        .filter_map(|v| v.as_str().map(|s| s.to_string()))
        .collect();

    // Check if this is a stream ARN
    if resource_arn.contains(":stream/")
        && !resource_arn.contains("/consumer/")
        && let Some(stream_name) = store.stream_name_from_arn(resource_arn)
    {
        store
            .update_stream(&stream_name, |stream| {
                for key in &tag_keys {
                    stream.tags.remove(key);
                }
                Ok(())
            })
            .await?;
        return Ok(None);
    }

    // For non-stream resources
    let mut existing = store.get_resource_tags(resource_arn).await;
    for key in &tag_keys {
        existing.remove(key);
    }
    store.put_resource_tags(resource_arn, &existing).await;

    Ok(None)
}