Skip to main content

Crate oxigdal_kinesis

Crate oxigdal_kinesis 

Source
Expand description

AWS Kinesis streaming integration for OxiGDAL

This crate provides comprehensive AWS Kinesis integration for OxiGDAL, including:

  • Kinesis Data Streams: Producer with KPL patterns, enhanced fan-out consumer, shard management, DynamoDB checkpointing
  • Kinesis Firehose: Delivery streams with transformations, S3/Redshift/Elasticsearch destinations
  • Kinesis Analytics: SQL queries on streams, tumbling/sliding/session windows, real-time analytics
  • Monitoring: CloudWatch metrics, stream monitoring, alerting system

§Features

  • streams - Kinesis Data Streams support (default)
  • firehose - Kinesis Firehose support (default)
  • analytics - Kinesis Analytics support (default)
  • monitoring - CloudWatch monitoring and metrics (default)
  • checkpointing - DynamoDB checkpointing for consumers
  • enhanced-fanout - Enhanced fan-out consumer support
  • compression - Data compression support

§Examples

§Kinesis Data Streams - Producer

use oxigdal_kinesis::streams::{Producer, ProducerConfig, Record};
use bytes::Bytes;

// Create AWS Kinesis client
let config = aws_config::load_from_env().await;
let client = aws_sdk_kinesis::Client::new(&config);

// Configure producer
let producer_config = ProducerConfig::new("my-stream")
    .with_buffer_size(1000)
    .with_linger_ms(100);

let producer = Producer::new(client, producer_config).await?;

// Send records
let record = Record::new("partition-key-1", Bytes::from("data"));
producer.send(record).await?;

// Flush pending records
producer.flush().await?;

§Kinesis Data Streams - Consumer

use oxigdal_kinesis::streams::{Consumer, ConsumerConfig};

let config = aws_config::load_from_env().await;
let client = aws_sdk_kinesis::Client::new(&config);

let consumer_config = ConsumerConfig::new("my-stream")
    .with_max_records(100);

let mut consumer = Consumer::new(client, consumer_config, "shard-0001").await?;

// Poll for records
let records = consumer.poll().await?;
for record in records {
    println!("Received: {:?}", record.data);
}

§Kinesis Firehose

use oxigdal_kinesis::firehose::{DeliveryStream, DeliveryStreamConfig, FirehoseRecord};
use oxigdal_kinesis::firehose::destination::S3DestinationConfig;
use bytes::Bytes;

let config = aws_config::load_from_env().await;
let client = aws_sdk_firehose::Client::new(&config);

let s3_config = S3DestinationConfig::new(
    "arn:aws:s3:::my-bucket",
    "arn:aws:iam::123456789012:role/firehose-role",
    "data/",
);

let stream_config = DeliveryStreamConfig::new("my-delivery-stream")
    .with_s3_destination(s3_config);

let mut delivery_stream = DeliveryStream::new(client, stream_config);
delivery_stream.start().await?;

// Send record
let record = FirehoseRecord::new(Bytes::from("data"));
delivery_stream.send_record(record).await?;

§Kinesis Analytics

use oxigdal_kinesis::analytics::sql::QueryBuilder;

// Build SQL query
let query = QueryBuilder::new()
    .select("userId")
    .select("COUNT(*) as event_count")
    .from("SOURCE_SQL_STREAM")
    .window("WINDOW TUMBLING (SIZE 1 MINUTE)")
    .group_by("userId")
    .build();

println!("Query: {}", query.as_str());

Re-exports§

pub use error::KinesisError;
pub use error::Result;

Modules§

analytics
Kinesis Analytics module for real-time stream processing
error
Error types for OxiGDAL Kinesis integration
firehose
Kinesis Firehose module for delivery streams
monitoring
Monitoring and metrics for Kinesis streams
streams
Kinesis Data Streams module

Structs§

KinesisClient
Kinesis client wrapper providing access to all Kinesis services