Expand description
AWS Kinesis streaming integration for OxiGDAL
This crate provides comprehensive AWS Kinesis integration for OxiGDAL, including:
- Kinesis Data Streams: Producer with KPL patterns, enhanced fan-out consumer, shard management, DynamoDB checkpointing
- Kinesis Firehose: Delivery streams with transformations, S3/Redshift/Elasticsearch destinations
- Kinesis Analytics: SQL queries on streams, tumbling/sliding/session windows, real-time analytics
- Monitoring: CloudWatch metrics, stream monitoring, alerting system
§Features
streams- Kinesis Data Streams support (default)firehose- Kinesis Firehose support (default)analytics- Kinesis Analytics support (default)monitoring- CloudWatch monitoring and metrics (default)checkpointing- DynamoDB checkpointing for consumersenhanced-fanout- Enhanced fan-out consumer supportcompression- Data compression support
§Examples
§Kinesis Data Streams - Producer
use oxigdal_kinesis::streams::{Producer, ProducerConfig, Record};
use bytes::Bytes;
// Create AWS Kinesis client
let config = aws_config::load_from_env().await;
let client = aws_sdk_kinesis::Client::new(&config);
// Configure producer
let producer_config = ProducerConfig::new("my-stream")
.with_buffer_size(1000)
.with_linger_ms(100);
let producer = Producer::new(client, producer_config).await?;
// Send records
let record = Record::new("partition-key-1", Bytes::from("data"));
producer.send(record).await?;
// Flush pending records
producer.flush().await?;§Kinesis Data Streams - Consumer
use oxigdal_kinesis::streams::{Consumer, ConsumerConfig};
let config = aws_config::load_from_env().await;
let client = aws_sdk_kinesis::Client::new(&config);
let consumer_config = ConsumerConfig::new("my-stream")
.with_max_records(100);
let mut consumer = Consumer::new(client, consumer_config, "shard-0001").await?;
// Poll for records
let records = consumer.poll().await?;
for record in records {
println!("Received: {:?}", record.data);
}§Kinesis Firehose
use oxigdal_kinesis::firehose::{DeliveryStream, DeliveryStreamConfig, FirehoseRecord};
use oxigdal_kinesis::firehose::destination::S3DestinationConfig;
use bytes::Bytes;
let config = aws_config::load_from_env().await;
let client = aws_sdk_firehose::Client::new(&config);
let s3_config = S3DestinationConfig::new(
"arn:aws:s3:::my-bucket",
"arn:aws:iam::123456789012:role/firehose-role",
"data/",
);
let stream_config = DeliveryStreamConfig::new("my-delivery-stream")
.with_s3_destination(s3_config);
let mut delivery_stream = DeliveryStream::new(client, stream_config);
delivery_stream.start().await?;
// Send record
let record = FirehoseRecord::new(Bytes::from("data"));
delivery_stream.send_record(record).await?;§Kinesis Analytics
use oxigdal_kinesis::analytics::sql::QueryBuilder;
// Build SQL query
let query = QueryBuilder::new()
.select("userId")
.select("COUNT(*) as event_count")
.from("SOURCE_SQL_STREAM")
.window("WINDOW TUMBLING (SIZE 1 MINUTE)")
.group_by("userId")
.build();
println!("Query: {}", query.as_str());Re-exports§
pub use error::KinesisError;pub use error::Result;
Modules§
- analytics
- Kinesis Analytics module for real-time stream processing
- error
- Error types for OxiGDAL Kinesis integration
- firehose
- Kinesis Firehose module for delivery streams
- monitoring
- Monitoring and metrics for Kinesis streams
- streams
- Kinesis Data Streams module
Structs§
- Kinesis
Client - Kinesis client wrapper providing access to all Kinesis services