ferrokinesis 0.7.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::sequence;
use crate::shard_iterator;
use crate::store::Store;
use crate::types::ShardIteratorType;
use crate::util::current_time_ms;
use serde_json::{Value, json};

pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
    let stream_name = data[constants::STREAM_NAME].as_str().unwrap_or("");
    let shard_id_input = data[constants::SHARD_ID].as_str().unwrap_or("");
    let iterator_type_raw = data[constants::SHARD_ITERATOR_TYPE]
        .as_str()
        .unwrap_or("")
        .to_string();
    let iterator_type: ShardIteratorType =
        serde_json::from_value(data[constants::SHARD_ITERATOR_TYPE].clone()).map_err(|_| {
            KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
                     StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
                     StartingSequenceNumber. Request specified {} and no StartingSequenceNumber.",
                    iterator_type_raw
                )),
            )
        })?;
    let starting_seq = data[constants::STARTING_SEQUENCE_NUMBER].as_str();
    let timestamp = data[constants::TIMESTAMP].as_f64();

    let (shard_id, shard_ix) = sequence::resolve_shard_id(shard_id_input).map_err(|_| {
        KinesisErrorResponse::client_error(
            constants::RESOURCE_NOT_FOUND,
            Some(&format!(
                "Could not find shard {} in stream {} under account {}.",
                shard_id_input, stream_name, store.aws_account_id
            )),
        )
    })?;

    let stream = store.get_stream(stream_name).await.map_err(|mut err| {
        if err.body.error_type == constants::RESOURCE_NOT_FOUND {
            err.body.message = Some(format!(
                "Shard {} in stream {} under account {} does not exist",
                shard_id, stream_name, store.aws_account_id
            ));
        }
        err
    })?;

    if shard_ix >= stream.shards.len() as i64 {
        return Err(KinesisErrorResponse::shard_not_found(
            &shard_id,
            stream_name,
            &store.aws_account_id,
        ));
    }

    let shard_seq = &stream.shards[shard_ix as usize]
        .sequence_number_range
        .starting_sequence_number;
    let shard_seq_obj = sequence::parse_sequence(shard_seq)
        .map_err(|_| KinesisErrorResponse::server_error(None, None))?;

    let now = current_time_ms();

    let iterator_seq;

    if let Some(start_seq) = starting_seq {
        if matches!(
            iterator_type,
            ShardIteratorType::TrimHorizon | ShardIteratorType::Latest
        ) {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
                     StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
                     StartingSequenceNumber. Request specified {} and also a StartingSequenceNumber.",
                    iterator_type_raw
                )),
            ));
        }

        let seq_obj = sequence::parse_sequence(start_seq).map_err(|_| {
            KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} \
                     under account {} is invalid.",
                    start_seq, shard_id, stream_name, store.aws_account_id
                )),
            )
        })?;

        if seq_obj.shard_ix != shard_ix {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "Invalid StartingSequenceNumber. It encodes {}, while it was used in a call \
                     to a shard with {}",
                    sequence::shard_id_name(seq_obj.shard_ix),
                    shard_id
                )),
            ));
        }

        // Cross-shard-lifecycle guard: every sequence number embeds the shard's
        // create_time. A mismatch means the number was generated by a different
        // incarnation of this shard (e.g. after a split/merge cycle) or belongs to a
        // different stream entirely and must be rejected. Version 0 (kinesalite legacy)
        // uses a server error rather than a client error to match real AWS behaviour
        // for that format — this is an intentional quirk, not a mistake.
        if seq_obj.version != shard_seq_obj.version
            || seq_obj.shard_create_time != shard_seq_obj.shard_create_time
        {
            if seq_obj.version == 0 {
                return Err(KinesisErrorResponse::server_error(None, None));
            }
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} \
                     under account {} is invalid because it did not come from this stream.",
                    start_seq, shard_id, stream_name, store.aws_account_id
                )),
            ));
        }

        if iterator_type == ShardIteratorType::AtSequenceNumber {
            iterator_seq = start_seq.to_string();
        } else {
            // AfterSequenceNumber
            iterator_seq = sequence::increment_sequence(&seq_obj, None);
        }
    } else {
        match iterator_type {
            ShardIteratorType::TrimHorizon => {
                iterator_seq = shard_seq.clone();
            }
            ShardIteratorType::Latest => {
                // Read the per-shard atomic counter for the current write frontier.
                let seq_ix = store.current_shard_seq(stream_name, shard_ix).await;
                iterator_seq = sequence::stringify_sequence(&sequence::SeqObj {
                    shard_create_time: shard_seq_obj.shard_create_time,
                    seq_ix: Some(seq_ix),
                    seq_time: Some(now),
                    shard_ix: shard_seq_obj.shard_ix,
                    byte1: None,
                    seq_rand: None,
                    version: 2,
                });
            }
            ShardIteratorType::AtTimestamp => {
                if timestamp.is_none() {
                    return Err(KinesisErrorResponse::client_error(
                        constants::INVALID_ARGUMENT,
                        Some(
                            "Must specify timestampInMillis parameter for iterator of type \
                             AT_TIMESTAMP. Current request has no timestamp parameter.",
                        ),
                    ));
                }
                let ts = timestamp.unwrap();
                let timestamp_millis = (ts * 1000.0) as u64;
                if timestamp_millis > now {
                    return Err(KinesisErrorResponse::client_error(
                        constants::INVALID_ARGUMENT,
                        Some(&format!(
                            "The timestampInMillis parameter cannot be greater than the \
                             currentTimestampInMillis. timestampInMillis: {}, \
                             currentTimestampInMillis: {}",
                            timestamp_millis, now
                        )),
                    ));
                }

                let range_start = format!("{}/", sequence::shard_ix_to_hex(shard_ix));
                let range_end = sequence::shard_ix_to_hex(shard_ix + 1);
                let found_seq = store
                    .find_first_record_at_timestamp(stream_name, &range_start, &range_end, ts)
                    .await
                    .and_then(|(key, _)| key.split('/').nth(1).map(|s| s.to_string()));

                let seq = found_seq.unwrap_or_else(|| shard_seq.clone());
                let result =
                    shard_iterator::create_shard_iterator(stream_name, &shard_id, &seq, now);
                tracing::trace!(stream = %stream_name, shard = %shard_id, iterator_type = %iterator_type_raw, "shard iterator created");
                store.record_iterator_created(now).await;
                return Ok(Some(json!({ "ShardIterator": result })));
            }
            ShardIteratorType::AtSequenceNumber | ShardIteratorType::AfterSequenceNumber => {
                return Err(KinesisErrorResponse::client_error(
                    constants::INVALID_ARGUMENT,
                    Some(&format!(
                        "Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
                         StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
                         StartingSequenceNumber. Request specified {} and no StartingSequenceNumber.",
                        iterator_type_raw
                    )),
                ));
            }
        }
    }

    let result = shard_iterator::create_shard_iterator(stream_name, &shard_id, &iterator_seq, now);
    tracing::trace!(stream = %stream_name, shard = %shard_id, iterator_type = %iterator_type_raw, "shard iterator created");
    store.record_iterator_created(now).await;
    Ok(Some(json!({ "ShardIterator": result })))
}