aws_utils_kinesis_data_streams 0.3.0

AWS Kinesis Data Streams utilities for Rust
Documentation
# Kinesis Data Streams

A Rust library providing utilities for AWS Kinesis Data Streams operations with built-in retry logic and batch processing capabilities.

## Features

- **Simple API**: Easy-to-use functions for putting records to Kinesis Data Streams
- **Batch Processing**: Efficient batch record operations with automatic size and count validation
- **Records Builder**: Builder pattern for constructing batches of records with size constraints
- **Error Handling**: Comprehensive error handling with custom error types
- **Retry Logic**: Built-in retry mechanisms for handling transient failures
- **AWS SDK Integration**: Built on top of the official AWS SDK for Rust
- **Testing Support**: Comprehensive unit tests with mocking capabilities

## Installation

Add this to your `Cargo.toml`:

```toml
[dependencies]
kinesis_data_streams = "0.1.0"
```

## Usage

### Basic Usage

```rust
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a Kinesis client with default timeout settings
    let client = make_client_with_timeout_default(None).await;
    
    // Put a single record
    let result = kinesis_data_streams::add_record(
        &client,
        "my-stream",
        "partition-key",
        "Hello, Kinesis!".to_string(),
    ).await?;
    
    println!("Record added with sequence number: {}", result.sequence_number());
    
    Ok(())
}
```

### Batch Processing with RecordsBuilder

```rust
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = make_client_with_timeout_default(None).await;
    
    // Build a batch of records
    let mut builder = RecordsBuilder::new();
    builder.add_entry_data("Record 1".to_string())?;
    builder.add_entry_data("Record 2".to_string())?;
    builder.add_entry("Record 3".to_string(), Some("custom-partition".to_string()), None)?;
    
    // Send the batch
    let records = builder.build();
    let result = kinesis_data_streams::add_records(&client, "my-stream", records).await?;
    
    println!("Batch sent with {} failed records", result.failed_record_count().unwrap_or(0));
    
    Ok(())
}
```

### Custom Endpoint (for testing)

```rust
use kinesis_data_streams::make_client_with_timeout_default;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Use a custom endpoint (e.g., for LocalStack)
    let client = make_client_with_timeout_default(Some("http://localhost:4566".to_string())).await;
    
    // Your Kinesis operations here...
    
    Ok(())
}
```

### Timeout Configuration

```rust
use kinesis_data_streams::{make_client, make_client_with_timeout, make_client_with_timeout_default};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Use default timeout settings (recommended)
    let client = make_client_with_timeout_default(None).await;
    
    // Use custom timeout settings
    let client = make_client_with_timeout(
        None, // endpoint_url
        Some(Duration::from_secs(3100)), // connect_timeout
        Some(Duration::from_secs(60)),   // operation_timeout
        Some(Duration::from_secs(55)),   // operation_attempt_timeout
        Some(Duration::from_secs(50)),   // read_timeout
    ).await;
    
    // Use legacy client without timeout configuration
    let client = make_client(None, None).await;
    
    Ok(())
}
```

## API Reference

### Functions

- `make_client_with_timeout_default(endpoint_url: Option<String>)` - Creates a Kinesis client with default timeout settings
- `make_client_with_timeout(endpoint_url, connect_timeout, operation_timeout, operation_attempt_timeout, read_timeout)` - Creates a Kinesis client with custom timeout settings
- `make_client(endpoint_url: Option<String>, timeout_config: Option<TimeoutConfig>)` - Creates a Kinesis client with optional custom endpoint and timeout configuration
- `kinesis_data_streams::add_record(client, stream_name, partition_key, data)` - Puts a single record
- `kinesis_data_streams::add_records(client, stream_name, records)` - Puts multiple records in batch

### RecordsBuilder

A builder for creating batches of records with automatic size validation:

- `new()` - Creates a new builder with default AWS limits
- `new_with_limit(single_limit, total_limit, record_limit)` - Creates a builder with custom limits
- `add_entry_data(data)` - Adds a record with auto-generated partition key
- `add_entry(data, partition_key, explicit_hash_key)` - Adds a record with custom keys
- `build()` - Builds the final vector of records
- `len()` - Returns the number of records
- `is_empty()` - Checks if the builder is empty

### Error Handling

The library provides comprehensive error handling through the `Error` enum using the `thiserror` crate:

```rust
#[derive(Error, Debug)]
pub enum Error {
    #[error(transparent)]
    BuildError(#[from] Box<aws_sdk_kinesis::error::BuildError>),
    
    #[error("EntryOverAll {0}")]
    EntryOverAll(String),
    
    #[error("EntryOverItem {0}")]
    EntryOverItem(String),
    
    #[error(transparent)]
    AwsSdk(#[from] Box<aws_sdk_kinesis::Error>),
}
```

Error variants:
- `BuildError` - Errors when building AWS SDK request entries
- `EntryOverItem` - Individual record exceeds the 1MB size limit
- `EntryOverAll` - Adding a record would exceed batch limits (5MB total or 500 records)
- `AwsSdk` - General AWS SDK errors (network issues, authentication, etc.)

#### Error Handling Example

```rust
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder, error::Error};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = make_client_with_timeout_default(None).await;
    
    match kinesis_data_streams::add_record(&client, "my-stream", "key", "data").await {
        Ok(output) => println!("Success: {}", output.sequence_number()),
        Err(Error::AwsSdk(e)) => {
            // Handle AWS SDK errors (e.g., stream not found, throttling)
            eprintln!("AWS error: {}", e);
        }
        Err(e) => eprintln!("Other error: {}", e),
    }
    
    // Batch operations with size limit handling
    let mut builder = RecordsBuilder::new();
    match builder.add_entry_data("Large data...".to_string()) {
        Ok(()) => println!("Record added to batch"),
        Err(Error::EntryOverItem(msg)) => {
            // Single record too large
            eprintln!("Record too large: {}", msg);
        }
        Err(Error::EntryOverAll(msg)) => {
            // Batch is full, need to send current batch
            eprintln!("Batch full: {}", msg);
        }
        Err(e) => eprintln!("Unexpected error: {}", e),
    }
    
    Ok(())
}
```

## AWS Kinesis Limits

The library respects AWS Kinesis Data Streams limits:

- **Single Record**: Maximum 1MB per record
- **Batch Operation**: Maximum 5MB total payload and 500 records per batch
- **Partition Key**: Maximum 256 UTF-8 characters

These limits are enforced by the `RecordsBuilder` to prevent API errors.

## Testing

Run the test suite:

```bash
cargo test
```

For integration tests with specific environment variables:

```bash
RUST_LOG=info REALM_CODE=test cargo test test_kinesis_data_streams_records -- --nocapture --test-threads=1
```

The library includes comprehensive unit tests with mocking capabilities using `mockito` for testing without actual AWS resources.

## Configuration

### Authentication

The client uses the AWS SDK's default credential chain for authentication:

- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`)
- ECS task role (for Fargate/ECS)
- EC2 instance profile
- AWS credentials file
- Other configured credential providers

## Dependencies

- `aws-config` - AWS configuration management
- `aws-sdk-kinesis` - Official AWS Kinesis SDK
- `thiserror` - Error handling
- `tracing` - Logging and tracing
- `uuid` - UUID generation for partition keys

## License

This project is licensed under either of

- Apache License, Version 2.0
- MIT License

at your option.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.