Kinesis Data Streams
A Rust library providing utilities for AWS Kinesis Data Streams operations with built-in retry logic and batch processing capabilities.
Features
- Simple API: Easy-to-use functions for putting records to Kinesis Data Streams
- Batch Processing: Efficient batch record operations with automatic size and count validation
- Records Builder: Builder pattern for constructing batches of records with size constraints
- Error Handling: Comprehensive error handling with custom error types
- Retry Logic: Built-in retry mechanisms for handling transient failures
- AWS SDK Integration: Built on top of the official AWS SDK for Rust
- Testing Support: Comprehensive unit tests with mocking capabilities
Installation
Add this to your Cargo.toml:
[]
= "0.1.0"
Usage
Basic Usage
use ;
async
Batch Processing with RecordsBuilder
use ;
async
Custom Endpoint (for testing)
use make_client_with_timeout_default;
async
Timeout Configuration
use ;
use Duration;
async
API Reference
Functions
make_client_with_timeout_default(endpoint_url: Option<String>)- Creates a Kinesis client with default timeout settingsmake_client_with_timeout(endpoint_url, connect_timeout, operation_timeout, operation_attempt_timeout, read_timeout)- Creates a Kinesis client with custom timeout settingsmake_client(endpoint_url: Option<String>, timeout_config: Option<TimeoutConfig>)- Creates a Kinesis client with optional custom endpoint and timeout configurationkinesis_data_streams::add_record(client, stream_name, partition_key, data)- Puts a single recordkinesis_data_streams::add_records(client, stream_name, records)- Puts multiple records in batch
RecordsBuilder
A builder for creating batches of records with automatic size validation:
new()- Creates a new builder with default AWS limitsnew_with_limit(single_limit, total_limit, record_limit)- Creates a builder with custom limitsadd_entry_data(data)- Adds a record with auto-generated partition keyadd_entry(data, partition_key, explicit_hash_key)- Adds a record with custom keysbuild()- Builds the final vector of recordslen()- Returns the number of recordsis_empty()- Checks if the builder is empty
Error Handling
The library provides comprehensive error handling through the Error enum using the thiserror crate:
Error variants:
BuildError- Errors when building AWS SDK request entriesEntryOverItem- Individual record exceeds the 1MB size limitEntryOverAll- Adding a record would exceed batch limits (5MB total or 500 records)AwsSdk- General AWS SDK errors (network issues, authentication, etc.)
Error Handling Example
use ;
async
AWS Kinesis Limits
The library respects AWS Kinesis Data Streams limits:
- Single Record: Maximum 1MB per record
- Batch Operation: Maximum 5MB total payload and 500 records per batch
- Partition Key: Maximum 256 UTF-8 characters
These limits are enforced by the RecordsBuilder to prevent API errors.
Testing
Run the test suite:
For integration tests with specific environment variables:
RUST_LOG=info REALM_CODE=test
The library includes comprehensive unit tests with mocking capabilities using mockito for testing without actual AWS resources.
Configuration
Environment Variables
The library automatically sets dummy AWS credentials for testing if not provided:
AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_REGION
For production use, configure these through your preferred AWS credential provider.
Dependencies
aws-config- AWS configuration managementaws-sdk-kinesis- Official AWS Kinesis SDKthiserror- Error handlingtracing- Logging and tracinguuid- UUID generation for partition keys
License
This project is licensed under either of
- Apache License, Version 2.0
- MIT License
at your option.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.