ferrokinesis 0.1.0

A local AWS Kinesis mock server for testing, written in Rust
Documentation
use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::sequence;
use crate::shard_iterator;
use crate::store::Store;
use crate::util::current_time_ms;
use num_bigint::BigUint;
use serde_json::{Value, json};

pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
    let stream_name = data[constants::STREAM_NAME].as_str().unwrap_or("");
    let shard_id_input = data[constants::SHARD_ID].as_str().unwrap_or("");
    let iterator_type = data[constants::SHARD_ITERATOR_TYPE].as_str().unwrap_or("");
    let starting_seq = data[constants::STARTING_SEQUENCE_NUMBER].as_str();
    let timestamp = data["Timestamp"].as_f64();

    let (shard_id, shard_ix) = sequence::resolve_shard_id(shard_id_input).map_err(|_| {
        KinesisErrorResponse::client_error(
            constants::RESOURCE_NOT_FOUND,
            Some(&format!(
                "Could not find shard {} in stream {} under account {}.",
                shard_id_input, stream_name, store.aws_account_id
            )),
        )
    })?;

    let stream = store.get_stream(stream_name).await.map_err(|mut err| {
        if err.body.__type == constants::RESOURCE_NOT_FOUND {
            err.body.message = Some(format!(
                "Shard {} in stream {} under account {} does not exist",
                shard_id, stream_name, store.aws_account_id
            ));
        }
        err
    })?;

    if shard_ix >= stream.shards.len() as i64 {
        return Err(KinesisErrorResponse::client_error(
            constants::RESOURCE_NOT_FOUND,
            Some(&format!(
                "Shard {} in stream {} under account {} does not exist",
                shard_id, stream_name, store.aws_account_id
            )),
        ));
    }

    let shard_seq = &stream.shards[shard_ix as usize]
        .sequence_number_range
        .starting_sequence_number;
    let shard_seq_obj = sequence::parse_sequence(shard_seq)
        .map_err(|_| KinesisErrorResponse::server_error(None, None))?;

    let now = current_time_ms();

    let iterator_seq;

    if let Some(start_seq) = starting_seq {
        if iterator_type == "TRIM_HORIZON" || iterator_type == "LATEST" {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or \
                     (2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. \
                     Request specified {} and also a StartingSequenceNumber.",
                    iterator_type
                )),
            ));
        }

        let seq_obj = sequence::parse_sequence(start_seq).map_err(|_| {
            KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} under account {} is invalid.",
                    start_seq, shard_id, stream_name, store.aws_account_id
                )),
            )
        })?;

        if seq_obj.shard_ix != shard_ix {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "Invalid StartingSequenceNumber. It encodes {}, while it was used in a call to a shard with {}",
                    sequence::shard_id_name(seq_obj.shard_ix),
                    shard_id
                )),
            ));
        }

        if seq_obj.version != shard_seq_obj.version
            || seq_obj.shard_create_time != shard_seq_obj.shard_create_time
        {
            if seq_obj.version == 0 {
                return Err(KinesisErrorResponse::server_error(None, None));
            }
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} under account {} is invalid because it did not come from this stream.",
                    start_seq, shard_id, stream_name, store.aws_account_id
                )),
            ));
        }

        if iterator_type == "AT_SEQUENCE_NUMBER" {
            iterator_seq = start_seq.to_string();
        } else {
            // AFTER_SEQUENCE_NUMBER
            iterator_seq = sequence::increment_sequence(&seq_obj, None);
        }
    } else if iterator_type == "TRIM_HORIZON" {
        iterator_seq = shard_seq.clone();
    } else if iterator_type == "LATEST" {
        let seq_ix = stream
            .seq_ix
            .get(shard_ix as usize / 5)
            .and_then(|v| *v)
            .unwrap_or(0);
        iterator_seq = sequence::stringify_sequence(&sequence::SeqObj {
            shard_create_time: shard_seq_obj.shard_create_time,
            seq_ix: Some(BigUint::from(seq_ix)),
            seq_time: Some(now),
            shard_ix: shard_seq_obj.shard_ix,
            byte1: None,
            seq_rand: None,
            version: 2,
        });
    } else if iterator_type == "AT_TIMESTAMP" {
        if timestamp.is_none() {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(
                    "Must specify timestampInMillis parameter for iterator of type AT_TIMESTAMP. Current request has no timestamp parameter.",
                ),
            ));
        }
        let ts = timestamp.unwrap();
        let timestamp_millis = (ts * 1000.0) as u64;
        if timestamp_millis > now {
            return Err(KinesisErrorResponse::client_error(
                constants::INVALID_ARGUMENT,
                Some(&format!(
                    "The timestampInMillis parameter cannot be greater than the currentTimestampInMillis. timestampInMillis: {}, currentTimestampInMillis: {}",
                    timestamp_millis, now
                )),
            ));
        }

        // Find first record at or after timestamp
        let record_store = store.get_record_store(stream_name).await;
        let range_start = format!("{}/", sequence::shard_ix_to_hex(shard_ix));
        let range_end = sequence::shard_ix_to_hex(shard_ix + 1);

        let mut found_seq = None;
        for (key, record) in record_store.range(range_start..range_end) {
            if record.approximate_arrival_timestamp >= ts {
                found_seq = key.split('/').nth(1).map(|s| s.to_string());
                break;
            }
        }

        if let Some(seq) = found_seq {
            iterator_seq = seq;
        } else {
            // No record found, use current position
            iterator_seq = shard_seq.clone();
        }

        let result = shard_iterator::create_shard_iterator(stream_name, &shard_id, &iterator_seq);
        return Ok(Some(json!({ "ShardIterator": result })));
    } else {
        return Err(KinesisErrorResponse::client_error(
            constants::INVALID_ARGUMENT,
            Some(&format!(
                "Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and StartingSequenceNumber or \
                 (2) TRIM_HORIZON or LATEST and no StartingSequenceNumber. \
                 Request specified {} and no StartingSequenceNumber.",
                iterator_type
            )),
        ));
    }

    let result = shard_iterator::create_shard_iterator(stream_name, &shard_id, &iterator_seq);
    Ok(Some(json!({ "ShardIterator": result })))
}