Skip to main content

oxigdal_kinesis/
lib.rs

1//! AWS Kinesis streaming integration for OxiGDAL
2//!
3//! This crate provides comprehensive AWS Kinesis integration for OxiGDAL, including:
4//!
5//! - **Kinesis Data Streams**: Producer with KPL patterns, enhanced fan-out consumer, shard management, DynamoDB checkpointing
6//! - **Kinesis Firehose**: Delivery streams with transformations, S3/Redshift/Elasticsearch destinations
7//! - **Kinesis Analytics**: SQL queries on streams, tumbling/sliding/session windows, real-time analytics
8//! - **Monitoring**: CloudWatch metrics, stream monitoring, alerting system
9//!
10//! # Features
11//!
12//! - `streams` - Kinesis Data Streams support (default)
13//! - `firehose` - Kinesis Firehose support (default)
14//! - `analytics` - Kinesis Analytics support (default)
15//! - `monitoring` - CloudWatch monitoring and metrics (default)
16//! - `checkpointing` - DynamoDB checkpointing for consumers
17//! - `enhanced-fanout` - Enhanced fan-out consumer support
18//! - `compression` - Data compression support
19//!
20//! # Examples
21//!
22//! ## Kinesis Data Streams - Producer
23//!
24//! ```rust,no_run
25//! # #[cfg(feature = "streams")]
26//! # async fn example() -> oxigdal_kinesis::Result<()> {
27//! use oxigdal_kinesis::streams::{Producer, ProducerConfig, Record};
28//! use bytes::Bytes;
29//!
30//! // Create AWS Kinesis client
31//! let config = aws_config::load_from_env().await;
32//! let client = aws_sdk_kinesis::Client::new(&config);
33//!
34//! // Configure producer
35//! let producer_config = ProducerConfig::new("my-stream")
36//!     .with_buffer_size(1000)
37//!     .with_linger_ms(100);
38//!
39//! let producer = Producer::new(client, producer_config).await?;
40//!
41//! // Send records
42//! let record = Record::new("partition-key-1", Bytes::from("data"));
43//! producer.send(record).await?;
44//!
45//! // Flush pending records
46//! producer.flush().await?;
47//! # Ok(())
48//! # }
49//! ```
50//!
51//! ## Kinesis Data Streams - Consumer
52//!
53//! ```rust,no_run
54//! # #[cfg(feature = "streams")]
55//! # async fn example() -> oxigdal_kinesis::Result<()> {
56//! use oxigdal_kinesis::streams::{Consumer, ConsumerConfig};
57//!
58//! let config = aws_config::load_from_env().await;
59//! let client = aws_sdk_kinesis::Client::new(&config);
60//!
61//! let consumer_config = ConsumerConfig::new("my-stream")
62//!     .with_max_records(100);
63//!
64//! let mut consumer = Consumer::new(client, consumer_config, "shard-0001").await?;
65//!
66//! // Poll for records
67//! let records = consumer.poll().await?;
68//! for record in records {
69//!     println!("Received: {:?}", record.data);
70//! }
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ## Kinesis Firehose
76//!
77//! ```rust,no_run
78//! # #[cfg(feature = "firehose")]
79//! # async fn example() -> oxigdal_kinesis::Result<()> {
80//! use oxigdal_kinesis::firehose::{DeliveryStream, DeliveryStreamConfig, FirehoseRecord};
81//! use oxigdal_kinesis::firehose::destination::S3DestinationConfig;
82//! use bytes::Bytes;
83//!
84//! let config = aws_config::load_from_env().await;
85//! let client = aws_sdk_firehose::Client::new(&config);
86//!
87//! let s3_config = S3DestinationConfig::new(
88//!     "arn:aws:s3:::my-bucket",
89//!     "arn:aws:iam::123456789012:role/firehose-role",
90//!     "data/",
91//! );
92//!
93//! let stream_config = DeliveryStreamConfig::new("my-delivery-stream")
94//!     .with_s3_destination(s3_config);
95//!
96//! let mut delivery_stream = DeliveryStream::new(client, stream_config);
97//! delivery_stream.start().await?;
98//!
99//! // Send record
100//! let record = FirehoseRecord::new(Bytes::from("data"));
101//! delivery_stream.send_record(record).await?;
102//! # Ok(())
103//! # }
104//! ```
105//!
106//! ## Kinesis Analytics
107//!
108//! ```rust,no_run
109//! # #[cfg(feature = "analytics")]
110//! # async fn example() -> oxigdal_kinesis::Result<()> {
111//! use oxigdal_kinesis::analytics::sql::QueryBuilder;
112//!
113//! // Build SQL query
114//! let query = QueryBuilder::new()
115//!     .select("userId")
116//!     .select("COUNT(*) as event_count")
117//!     .from("SOURCE_SQL_STREAM")
118//!     .window("WINDOW TUMBLING (SIZE 1 MINUTE)")
119//!     .group_by("userId")
120//!     .build();
121//!
122//! println!("Query: {}", query.as_str());
123//! # Ok(())
124//! # }
125//! ```
126
127#![cfg_attr(not(feature = "std"), no_std)]
128#![warn(missing_docs)]
129
130#[cfg(feature = "alloc")]
131extern crate alloc;
132
133pub mod error;
134
135#[cfg(feature = "streams")]
136pub mod streams;
137
138#[cfg(feature = "firehose")]
139pub mod firehose;
140
141#[cfg(feature = "analytics")]
142pub mod analytics;
143
144#[cfg(feature = "monitoring")]
145pub mod monitoring;
146
147pub use error::{KinesisError, Result};
148
149/// Kinesis client wrapper providing access to all Kinesis services
150#[derive(Clone)]
151pub struct KinesisClient {
152    #[cfg(feature = "streams")]
153    streams: Option<streams::KinesisStreams>,
154
155    #[cfg(feature = "firehose")]
156    firehose: Option<firehose::KinesisFirehose>,
157
158    #[cfg(feature = "analytics")]
159    analytics: Option<analytics::KinesisAnalytics>,
160
161    #[cfg(feature = "monitoring")]
162    monitoring: Option<monitoring::KinesisMonitoring>,
163}
164
165impl KinesisClient {
166    /// Creates a new Kinesis client
167    pub fn new() -> Self {
168        Self {
169            #[cfg(feature = "streams")]
170            streams: None,
171            #[cfg(feature = "firehose")]
172            firehose: None,
173            #[cfg(feature = "analytics")]
174            analytics: None,
175            #[cfg(feature = "monitoring")]
176            monitoring: None,
177        }
178    }
179
180    /// Creates a new Kinesis client from environment
181    #[cfg(feature = "async")]
182    pub async fn from_env() -> Self {
183        Self {
184            #[cfg(feature = "streams")]
185            streams: None,
186            #[cfg(feature = "firehose")]
187            firehose: Some(firehose::KinesisFirehose::from_env().await),
188            #[cfg(feature = "analytics")]
189            analytics: Some(analytics::KinesisAnalytics::from_env().await),
190            #[cfg(feature = "monitoring")]
191            monitoring: Some(monitoring::KinesisMonitoring::from_env().await),
192        }
193    }
194
195    /// Sets the Kinesis Data Streams client
196    #[cfg(feature = "streams")]
197    pub fn with_streams(mut self, _stream_name: impl Into<String>) -> Self {
198        // This would be initialized with actual AWS client in real usage
199        self.streams = None; // Placeholder
200        self
201    }
202
203    /// Gets the Kinesis Data Streams client
204    #[cfg(feature = "streams")]
205    pub fn streams(&self) -> Option<&streams::KinesisStreams> {
206        self.streams.as_ref()
207    }
208
209    /// Gets the Kinesis Firehose client
210    #[cfg(feature = "firehose")]
211    pub fn firehose(&self) -> Option<&firehose::KinesisFirehose> {
212        self.firehose.as_ref()
213    }
214
215    /// Gets the Kinesis Analytics client
216    #[cfg(feature = "analytics")]
217    pub fn analytics(&self) -> Option<&analytics::KinesisAnalytics> {
218        self.analytics.as_ref()
219    }
220
221    /// Gets the monitoring client
222    #[cfg(feature = "monitoring")]
223    pub fn monitoring(&self) -> Option<&monitoring::KinesisMonitoring> {
224        self.monitoring.as_ref()
225    }
226}
227
228impl Default for KinesisClient {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn test_kinesis_client_creation() {
240        let client = KinesisClient::new();
241        assert!(client.firehose().is_none());
242    }
243
244    #[test]
245    fn test_kinesis_client_default() {
246        let client = KinesisClient::default();
247        assert!(client.firehose().is_none());
248    }
249}