use crate::constants;
use crate::error::KinesisErrorResponse;
use crate::sequence;
use crate::shard_iterator;
use crate::store::Store;
use crate::types::ShardIteratorType;
use crate::util::current_time_ms;
use num_bigint::BigUint;
use serde_json::{Value, json};
pub async fn execute(store: &Store, data: Value) -> Result<Option<Value>, KinesisErrorResponse> {
let stream_name = data[constants::STREAM_NAME].as_str().unwrap_or("");
let shard_id_input = data[constants::SHARD_ID].as_str().unwrap_or("");
let iterator_type_raw = data[constants::SHARD_ITERATOR_TYPE]
.as_str()
.unwrap_or("")
.to_string();
let iterator_type: ShardIteratorType =
serde_json::from_value(data[constants::SHARD_ITERATOR_TYPE].clone()).map_err(|_| {
KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
StartingSequenceNumber. Request specified {} and no StartingSequenceNumber.",
iterator_type_raw
)),
)
})?;
let starting_seq = data[constants::STARTING_SEQUENCE_NUMBER].as_str();
let timestamp = data[constants::TIMESTAMP].as_f64();
let (shard_id, shard_ix) = sequence::resolve_shard_id(shard_id_input).map_err(|_| {
KinesisErrorResponse::client_error(
constants::RESOURCE_NOT_FOUND,
Some(&format!(
"Could not find shard {} in stream {} under account {}.",
shard_id_input, stream_name, store.aws_account_id
)),
)
})?;
let stream = store.get_stream(stream_name).await.map_err(|mut err| {
if err.body.__type == constants::RESOURCE_NOT_FOUND {
err.body.message = Some(format!(
"Shard {} in stream {} under account {} does not exist",
shard_id, stream_name, store.aws_account_id
));
}
err
})?;
if shard_ix >= stream.shards.len() as i64 {
return Err(KinesisErrorResponse::client_error(
constants::RESOURCE_NOT_FOUND,
Some(&format!(
"Shard {} in stream {} under account {} does not exist",
shard_id, stream_name, store.aws_account_id
)),
));
}
let shard_seq = &stream.shards[shard_ix as usize]
.sequence_number_range
.starting_sequence_number;
let shard_seq_obj = sequence::parse_sequence(shard_seq)
.map_err(|_| KinesisErrorResponse::server_error(None, None))?;
let now = current_time_ms();
let iterator_seq;
if let Some(start_seq) = starting_seq {
if matches!(
iterator_type,
ShardIteratorType::TrimHorizon | ShardIteratorType::Latest
) {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
StartingSequenceNumber. Request specified {} and also a StartingSequenceNumber.",
iterator_type_raw
)),
));
}
let seq_obj = sequence::parse_sequence(start_seq).map_err(|_| {
KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} \
under account {} is invalid.",
start_seq, shard_id, stream_name, store.aws_account_id
)),
)
})?;
if seq_obj.shard_ix != shard_ix {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"Invalid StartingSequenceNumber. It encodes {}, while it was used in a call \
to a shard with {}",
sequence::shard_id_name(seq_obj.shard_ix),
shard_id
)),
));
}
if seq_obj.version != shard_seq_obj.version
|| seq_obj.shard_create_time != shard_seq_obj.shard_create_time
{
if seq_obj.version == 0 {
return Err(KinesisErrorResponse::server_error(None, None));
}
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"StartingSequenceNumber {} used in GetShardIterator on shard {} in stream {} \
under account {} is invalid because it did not come from this stream.",
start_seq, shard_id, stream_name, store.aws_account_id
)),
));
}
if iterator_type == ShardIteratorType::AtSequenceNumber {
iterator_seq = start_seq.to_string();
} else {
iterator_seq = sequence::increment_sequence(&seq_obj, None);
}
} else {
match iterator_type {
ShardIteratorType::TrimHorizon => {
iterator_seq = shard_seq.clone();
}
ShardIteratorType::Latest => {
let seq_ix = stream
.seq_ix
.get(shard_ix as usize / 5)
.and_then(|v| *v)
.unwrap_or(0);
iterator_seq = sequence::stringify_sequence(&sequence::SeqObj {
shard_create_time: shard_seq_obj.shard_create_time,
seq_ix: Some(BigUint::from(seq_ix)),
seq_time: Some(now),
shard_ix: shard_seq_obj.shard_ix,
byte1: None,
seq_rand: None,
version: 2,
});
}
ShardIteratorType::AtTimestamp => {
if timestamp.is_none() {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(
"Must specify timestampInMillis parameter for iterator of type \
AT_TIMESTAMP. Current request has no timestamp parameter.",
),
));
}
let ts = timestamp.unwrap();
let timestamp_millis = (ts * 1000.0) as u64;
if timestamp_millis > now {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"The timestampInMillis parameter cannot be greater than the \
currentTimestampInMillis. timestampInMillis: {}, \
currentTimestampInMillis: {}",
timestamp_millis, now
)),
));
}
let record_store = store.get_record_store(stream_name).await;
let range_start = format!("{}/", sequence::shard_ix_to_hex(shard_ix));
let range_end = sequence::shard_ix_to_hex(shard_ix + 1);
let mut found_seq = None;
for (key, record) in record_store.range(range_start..range_end) {
if record.approximate_arrival_timestamp >= ts {
found_seq = key.split('/').nth(1).map(|s| s.to_string());
break;
}
}
let seq = found_seq.unwrap_or_else(|| shard_seq.clone());
let result = shard_iterator::create_shard_iterator(stream_name, &shard_id, &seq);
return Ok(Some(json!({ "ShardIterator": result })));
}
ShardIteratorType::AtSequenceNumber | ShardIteratorType::AfterSequenceNumber => {
return Err(KinesisErrorResponse::client_error(
constants::INVALID_ARGUMENT,
Some(&format!(
"Must either specify (1) AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER and \
StartingSequenceNumber or (2) TRIM_HORIZON or LATEST and no \
StartingSequenceNumber. Request specified {} and no StartingSequenceNumber.",
iterator_type_raw
)),
));
}
}
}
let result = shard_iterator::create_shard_iterator(stream_name, &shard_id, &iterator_seq);
Ok(Some(json!({ "ShardIterator": result })))
}