import json
import os
import time
import boto3
import pytest
ENDPOINT = os.environ.get("KINESIS_ENDPOINT", "http://localhost:4567")
STREAM_NAME = "python-conformance"
@pytest.fixture(scope="module")
def client():
return boto3.client(
"kinesis",
endpoint_url=ENDPOINT,
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
def wait_for_active(client, stream, max_retries=30, interval=0.2):
for _ in range(max_retries):
resp = client.describe_stream(StreamName=stream)
status = resp["StreamDescription"]["StreamStatus"]
if status == "ACTIVE":
return resp
time.sleep(interval)
raise TimeoutError(f"Stream {stream!r} did not become ACTIVE")
def test_conformance(client):
client.create_stream(StreamName=STREAM_NAME, ShardCount=2)
desc = wait_for_active(client, STREAM_NAME)
stream = desc["StreamDescription"]
assert stream["StreamName"] == STREAM_NAME
assert stream["StreamStatus"] == "ACTIVE"
assert len(stream["Shards"]) == 2
listed = client.list_streams()
assert STREAM_NAME in listed["StreamNames"]
put = client.put_record(
StreamName=STREAM_NAME,
Data=b"hello from python",
PartitionKey="pk-1",
)
assert put["ShardId"]
assert put["SequenceNumber"]
shard_id = put["ShardId"]
records = client.put_records(
StreamName=STREAM_NAME,
Records=[
{"Data": f"batch-{i}".encode(), "PartitionKey": f"pk-{i}"}
for i in range(3)
],
)
assert records["FailedRecordCount"] == 0
assert len(records["Records"]) == 3
shard_iter = client.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
iterator = shard_iter["ShardIterator"]
assert iterator
got = client.get_records(ShardIterator=iterator)
assert len(got["Records"]) >= 1
assert got["Records"][0]["Data"] == b"hello from python"
client.delete_stream(StreamName=STREAM_NAME)
def test_tagging(client):
stream_name = "python-tags"
client.create_stream(StreamName=stream_name, ShardCount=1)
wait_for_active(client, stream_name)
limits = client.describe_limits()
assert "ShardLimit" in limits
assert "OpenShardCount" in limits
summary = client.describe_stream_summary(StreamName=stream_name)
assert summary["StreamDescriptionSummary"]["StreamName"] == stream_name
assert summary["StreamDescriptionSummary"]["OpenShardCount"] == 1
client.add_tags_to_stream(
StreamName=stream_name,
Tags={"env": "test", "team": "platform"},
)
tags_resp = client.list_tags_for_stream(StreamName=stream_name)
assert len(tags_resp["Tags"]) == 2
client.remove_tags_from_stream(StreamName=stream_name, TagKeys=["team"])
tags_resp = client.list_tags_for_stream(StreamName=stream_name)
assert len(tags_resp["Tags"]) == 1
assert tags_resp["Tags"][0]["Key"] == "env"
desc = client.describe_stream(StreamName=stream_name)
arn = desc["StreamDescription"]["StreamARN"]
client.tag_resource(
ResourceARN=arn,
Tags={"version": "1"},
)
tags_resp = client.list_tags_for_resource(ResourceARN=arn)
tag_keys = [t["Key"] for t in tags_resp["Tags"]]
assert "version" in tag_keys
client.untag_resource(ResourceARN=arn, TagKeys=["version"])
client.delete_stream(StreamName=stream_name)
def test_stream_config(client):
stream_name = "python-config"
client.create_stream(StreamName=stream_name, ShardCount=1)
wait_for_active(client, stream_name)
client.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=48
)
desc = client.describe_stream(StreamName=stream_name)
assert desc["StreamDescription"]["RetentionPeriodHours"] == 48
client.decrease_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=24
)
client.start_stream_encryption(
StreamName=stream_name,
EncryptionType="KMS",
KeyId="alias/aws/kinesis",
)
client.stop_stream_encryption(
StreamName=stream_name,
EncryptionType="KMS",
KeyId="alias/aws/kinesis",
)
client.enable_enhanced_monitoring(
StreamName=stream_name,
ShardLevelMetrics=["IncomingBytes"],
)
client.disable_enhanced_monitoring(
StreamName=stream_name,
ShardLevelMetrics=["IncomingBytes"],
)
desc = client.describe_stream(StreamName=stream_name)
arn = desc["StreamDescription"]["StreamARN"]
client.update_stream_mode(
StreamARN=arn,
StreamModeDetails={"StreamMode": "ON_DEMAND"},
)
wait_for_active(client, stream_name)
desc = client.describe_stream(StreamName=stream_name)
assert (
desc["StreamDescription"]["StreamModeDetails"]["StreamMode"] == "ON_DEMAND"
)
client.delete_stream(StreamName=stream_name)
def test_shard_management(client):
stream_name = "python-shards"
client.create_stream(StreamName=stream_name, ShardCount=2)
wait_for_active(client, stream_name)
shards_resp = client.list_shards(StreamName=stream_name)
shards = shards_resp["Shards"]
assert len(shards) == 2
shard_0 = next(s for s in shards if s["ShardId"] == "shardId-000000000000")
starting = int(shard_0["HashKeyRange"]["StartingHashKey"])
ending = int(shard_0["HashKeyRange"]["EndingHashKey"])
midpoint = (starting + ending) // 2
client.split_shard(
StreamName=stream_name,
ShardToSplit="shardId-000000000000",
NewStartingHashKey=str(midpoint),
)
wait_for_active(client, stream_name)
shards_resp = client.list_shards(StreamName=stream_name)
assert len(shards_resp["Shards"]) >= 4
client.merge_shards(
StreamName=stream_name,
ShardToMerge="shardId-000000000002",
AdjacentShardToMerge="shardId-000000000003",
)
wait_for_active(client, stream_name)
shards_resp = client.list_shards(StreamName=stream_name)
assert len(shards_resp["Shards"]) >= 5
client.update_shard_count(
StreamName=stream_name,
TargetShardCount=1,
ScalingType="UNIFORM_SCALING",
)
wait_for_active(client, stream_name)
client.delete_stream(StreamName=stream_name)
def test_stream_consumers(client):
stream_name = "python-consumers"
client.create_stream(StreamName=stream_name, ShardCount=1)
wait_for_active(client, stream_name)
desc = client.describe_stream(StreamName=stream_name)
arn = desc["StreamDescription"]["StreamARN"]
reg = client.register_stream_consumer(
StreamARN=arn, ConsumerName="python-test-consumer"
)
consumer_arn = reg["Consumer"]["ConsumerARN"]
consumer_desc = None
for _ in range(30):
consumer_desc = client.describe_stream_consumer(ConsumerARN=consumer_arn)
if consumer_desc["ConsumerDescription"]["ConsumerStatus"] == "ACTIVE":
break
time.sleep(0.2)
assert consumer_desc["ConsumerDescription"]["ConsumerStatus"] == "ACTIVE"
assert (
consumer_desc["ConsumerDescription"]["ConsumerName"]
== "python-test-consumer"
)
consumers = client.list_stream_consumers(StreamARN=arn)
consumer_names = [c["ConsumerName"] for c in consumers["Consumers"]]
assert "python-test-consumer" in consumer_names
client.deregister_stream_consumer(ConsumerARN=consumer_arn)
client.delete_stream(StreamName=stream_name)
def test_policies(client):
stream_name = "python-policies"
client.create_stream(StreamName=stream_name, ShardCount=1)
wait_for_active(client, stream_name)
desc = client.describe_stream(StreamName=stream_name)
arn = desc["StreamDescription"]["StreamARN"]
policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "test",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "kinesis:GetRecords",
"Resource": "*",
}
],
}
)
client.put_resource_policy(ResourceARN=arn, Policy=policy)
policy_resp = client.get_resource_policy(ResourceARN=arn)
assert policy_resp["Policy"]
client.delete_resource_policy(ResourceARN=arn)
policy_resp = client.get_resource_policy(ResourceARN=arn)
assert not policy_resp["Policy"]
client.delete_stream(StreamName=stream_name)