ferrokinesis 0.7.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::store::Store;
use serde_json::{Value, json};

pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
    let stream_name = data[constants::STREAM_NAME].as_str().unwrap_or("");
    let metrics = data[constants::SHARD_LEVEL_METRICS]
        .as_array()
        .ok_or_else(|| {
            KinesisErrorResponse::client_error(constants::SERIALIZATION_EXCEPTION, None)
        })?;

    let to_remove: Vec<String> = metrics
        .iter()
        .filter_map(|v| v.as_str().map(|s| s.to_string()))
        .collect();

    let result = store
        .update_stream(stream_name, |stream| {
            let current: Vec<String> = stream
                .enhanced_monitoring
                .first()
                .map(|m| m.shard_level_metrics.clone())
                .unwrap_or_default();

            let desired: Vec<String> = if to_remove.contains(&"ALL".to_string()) {
                vec![]
            } else {
                current
                    .iter()
                    .filter(|m| !to_remove.contains(m))
                    .cloned()
                    .collect()
            };

            stream.enhanced_monitoring = vec![crate::types::EnhancedMonitoring {
                shard_level_metrics: desired.clone(),
            }];

            Ok(json!({
                "StreamName": stream.stream_name,
                "StreamARN": stream.stream_arn,
                "CurrentShardLevelMetrics": current,
                "DesiredShardLevelMetrics": desired,
            }))
        })
        .await?;

    tracing::trace!(stream = %stream_name, "enhanced monitoring disabled");
    Ok(Some(result))
}