oxigdal_kinesis/lib.rs
1//! AWS Kinesis streaming integration for OxiGDAL
2//!
3//! This crate provides comprehensive AWS Kinesis integration for OxiGDAL, including:
4//!
5//! - **Kinesis Data Streams**: Producer with KPL patterns, enhanced fan-out consumer, shard management, DynamoDB checkpointing
6//! - **Kinesis Firehose**: Delivery streams with transformations, S3/Redshift/Elasticsearch destinations
7//! - **Kinesis Analytics**: SQL queries on streams, tumbling/sliding/session windows, real-time analytics
8//! - **Monitoring**: CloudWatch metrics, stream monitoring, alerting system
9//!
10//! # Features
11//!
12//! - `streams` - Kinesis Data Streams support (default)
13//! - `firehose` - Kinesis Firehose support (default)
14//! - `analytics` - Kinesis Analytics support (default)
15//! - `monitoring` - CloudWatch monitoring and metrics (default)
16//! - `checkpointing` - DynamoDB checkpointing for consumers
17//! - `enhanced-fanout` - Enhanced fan-out consumer support
18//! - `compression` - Data compression support
19//!
20//! # Examples
21//!
22//! ## Kinesis Data Streams - Producer
23//!
24//! ```rust,no_run
25//! # #[cfg(feature = "streams")]
26//! # async fn example() -> oxigdal_kinesis::Result<()> {
27//! use oxigdal_kinesis::streams::{Producer, ProducerConfig, Record};
28//! use bytes::Bytes;
29//!
30//! // Create AWS Kinesis client
31//! let config = aws_config::load_from_env().await;
32//! let client = aws_sdk_kinesis::Client::new(&config);
33//!
34//! // Configure producer
35//! let producer_config = ProducerConfig::new("my-stream")
36//! .with_buffer_size(1000)
37//! .with_linger_ms(100);
38//!
39//! let producer = Producer::new(client, producer_config).await?;
40//!
41//! // Send records
42//! let record = Record::new("partition-key-1", Bytes::from("data"));
43//! producer.send(record).await?;
44//!
45//! // Flush pending records
46//! producer.flush().await?;
47//! # Ok(())
48//! # }
49//! ```
50//!
51//! ## Kinesis Data Streams - Consumer
52//!
53//! ```rust,no_run
54//! # #[cfg(feature = "streams")]
55//! # async fn example() -> oxigdal_kinesis::Result<()> {
56//! use oxigdal_kinesis::streams::{Consumer, ConsumerConfig};
57//!
58//! let config = aws_config::load_from_env().await;
59//! let client = aws_sdk_kinesis::Client::new(&config);
60//!
61//! let consumer_config = ConsumerConfig::new("my-stream")
62//! .with_max_records(100);
63//!
64//! let mut consumer = Consumer::new(client, consumer_config, "shard-0001").await?;
65//!
66//! // Poll for records
67//! let records = consumer.poll().await?;
68//! for record in records {
69//! println!("Received: {:?}", record.data);
70//! }
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ## Kinesis Firehose
76//!
77//! ```rust,no_run
78//! # #[cfg(feature = "firehose")]
79//! # async fn example() -> oxigdal_kinesis::Result<()> {
80//! use oxigdal_kinesis::firehose::{DeliveryStream, DeliveryStreamConfig, FirehoseRecord};
81//! use oxigdal_kinesis::firehose::destination::S3DestinationConfig;
82//! use bytes::Bytes;
83//!
84//! let config = aws_config::load_from_env().await;
85//! let client = aws_sdk_firehose::Client::new(&config);
86//!
87//! let s3_config = S3DestinationConfig::new(
88//! "arn:aws:s3:::my-bucket",
89//! "arn:aws:iam::123456789012:role/firehose-role",
90//! "data/",
91//! );
92//!
93//! let stream_config = DeliveryStreamConfig::new("my-delivery-stream")
94//! .with_s3_destination(s3_config);
95//!
96//! let mut delivery_stream = DeliveryStream::new(client, stream_config);
97//! delivery_stream.start().await?;
98//!
99//! // Send record
100//! let record = FirehoseRecord::new(Bytes::from("data"));
101//! delivery_stream.send_record(record).await?;
102//! # Ok(())
103//! # }
104//! ```
105//!
106//! ## Kinesis Analytics
107//!
108//! ```rust,no_run
109//! # #[cfg(feature = "analytics")]
110//! # async fn example() -> oxigdal_kinesis::Result<()> {
111//! use oxigdal_kinesis::analytics::sql::QueryBuilder;
112//!
113//! // Build SQL query
114//! let query = QueryBuilder::new()
115//! .select("userId")
116//! .select("COUNT(*) as event_count")
117//! .from("SOURCE_SQL_STREAM")
118//! .window("WINDOW TUMBLING (SIZE 1 MINUTE)")
119//! .group_by("userId")
120//! .build();
121//!
122//! println!("Query: {}", query.as_str());
123//! # Ok(())
124//! # }
125//! ```
126
127#![cfg_attr(not(feature = "std"), no_std)]
128#![warn(missing_docs)]
129
130#[cfg(feature = "alloc")]
131extern crate alloc;
132
133pub mod error;
134
135#[cfg(feature = "streams")]
136pub mod streams;
137
138#[cfg(feature = "firehose")]
139pub mod firehose;
140
141#[cfg(feature = "analytics")]
142pub mod analytics;
143
144#[cfg(feature = "monitoring")]
145pub mod monitoring;
146
147pub use error::{KinesisError, Result};
148
149/// Kinesis client wrapper providing access to all Kinesis services
150#[derive(Clone)]
151pub struct KinesisClient {
152 #[cfg(feature = "streams")]
153 streams: Option<streams::KinesisStreams>,
154
155 #[cfg(feature = "firehose")]
156 firehose: Option<firehose::KinesisFirehose>,
157
158 #[cfg(feature = "analytics")]
159 analytics: Option<analytics::KinesisAnalytics>,
160
161 #[cfg(feature = "monitoring")]
162 monitoring: Option<monitoring::KinesisMonitoring>,
163}
164
165impl KinesisClient {
166 /// Creates a new Kinesis client
167 pub fn new() -> Self {
168 Self {
169 #[cfg(feature = "streams")]
170 streams: None,
171 #[cfg(feature = "firehose")]
172 firehose: None,
173 #[cfg(feature = "analytics")]
174 analytics: None,
175 #[cfg(feature = "monitoring")]
176 monitoring: None,
177 }
178 }
179
180 /// Creates a new Kinesis client from environment
181 #[cfg(feature = "async")]
182 pub async fn from_env() -> Self {
183 Self {
184 #[cfg(feature = "streams")]
185 streams: None,
186 #[cfg(feature = "firehose")]
187 firehose: Some(firehose::KinesisFirehose::from_env().await),
188 #[cfg(feature = "analytics")]
189 analytics: Some(analytics::KinesisAnalytics::from_env().await),
190 #[cfg(feature = "monitoring")]
191 monitoring: Some(monitoring::KinesisMonitoring::from_env().await),
192 }
193 }
194
195 /// Sets the Kinesis Data Streams client
196 #[cfg(feature = "streams")]
197 pub fn with_streams(mut self, _stream_name: impl Into<String>) -> Self {
198 // This would be initialized with actual AWS client in real usage
199 self.streams = None; // Placeholder
200 self
201 }
202
203 /// Gets the Kinesis Data Streams client
204 #[cfg(feature = "streams")]
205 pub fn streams(&self) -> Option<&streams::KinesisStreams> {
206 self.streams.as_ref()
207 }
208
209 /// Gets the Kinesis Firehose client
210 #[cfg(feature = "firehose")]
211 pub fn firehose(&self) -> Option<&firehose::KinesisFirehose> {
212 self.firehose.as_ref()
213 }
214
215 /// Gets the Kinesis Analytics client
216 #[cfg(feature = "analytics")]
217 pub fn analytics(&self) -> Option<&analytics::KinesisAnalytics> {
218 self.analytics.as_ref()
219 }
220
221 /// Gets the monitoring client
222 #[cfg(feature = "monitoring")]
223 pub fn monitoring(&self) -> Option<&monitoring::KinesisMonitoring> {
224 self.monitoring.as_ref()
225 }
226}
227
228impl Default for KinesisClient {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn test_kinesis_client_creation() {
240 let client = KinesisClient::new();
241 assert!(client.firehose().is_none());
242 }
243
244 #[test]
245 fn test_kinesis_client_default() {
246 let client = KinesisClient::default();
247 assert!(client.firehose().is_none());
248 }
249}