1use crate::error::{Result, StreamingError};
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct StreamElement {
15 pub data: Vec<u8>,
17
18 pub event_time: DateTime<Utc>,
20
21 pub processing_time: DateTime<Utc>,
23
24 pub key: Option<Vec<u8>>,
26
27 pub metadata: StreamMetadata,
29}
30
31impl StreamElement {
32 pub fn new(data: Vec<u8>, event_time: DateTime<Utc>) -> Self {
34 Self {
35 data,
36 event_time,
37 processing_time: Utc::now(),
38 key: None,
39 metadata: StreamMetadata::default(),
40 }
41 }
42
43 pub fn with_key(mut self, key: Vec<u8>) -> Self {
45 self.key = Some(key);
46 self
47 }
48
49 pub fn with_metadata(mut self, metadata: StreamMetadata) -> Self {
51 self.metadata = metadata;
52 self
53 }
54
55 pub fn size_bytes(&self) -> usize {
57 self.data.len() + self.key.as_ref().map_or(0, |k| k.len())
58 }
59}
60
61#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63pub struct StreamMetadata {
64 pub source_id: Option<String>,
66
67 pub partition_id: Option<u32>,
69
70 pub sequence_number: Option<u64>,
72
73 pub attributes: std::collections::HashMap<String, String>,
75}
76
77#[derive(Debug, Clone)]
79pub enum StreamMessage {
80 Data(StreamElement),
82
83 Watermark(DateTime<Utc>),
85
86 Checkpoint(u64),
88
89 EndOfStream,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct StreamConfig {
96 pub buffer_size: usize,
98
99 pub bounded: bool,
101
102 pub timeout: Duration,
104
105 pub enable_checkpointing: bool,
107
108 pub checkpoint_interval: Duration,
110
111 pub parallelism: usize,
113}
114
115impl Default for StreamConfig {
116 fn default() -> Self {
117 Self {
118 buffer_size: 1024,
119 bounded: true,
120 timeout: Duration::from_secs(30),
121 enable_checkpointing: false,
122 checkpoint_interval: Duration::from_secs(60),
123 parallelism: std::thread::available_parallelism()
124 .map(|n| n.get())
125 .unwrap_or(1),
126 }
127 }
128}
129
130#[async_trait]
132pub trait StreamSource: Send + Sync {
133 async fn next(&mut self) -> Result<Option<StreamMessage>>;
135
136 async fn has_next(&self) -> bool;
138
139 async fn close(&mut self) -> Result<()>;
141}
142
143#[async_trait]
145pub trait StreamSink: Send + Sync {
146 async fn write(&mut self, element: StreamMessage) -> Result<()>;
148
149 async fn flush(&mut self) -> Result<()>;
151
152 async fn close(&mut self) -> Result<()>;
154}
155
156pub struct Stream {
158 config: StreamConfig,
160
161 sender: Sender<StreamMessage>,
163
164 receiver: Receiver<StreamMessage>,
166
167 state: Arc<RwLock<StreamState>>,
169}
170
171#[derive(Debug)]
173struct StreamState {
174 closed: bool,
176
177 watermark: Option<DateTime<Utc>>,
179
180 last_checkpoint: Option<u64>,
182
183 elements_processed: u64,
185
186 bytes_processed: u64,
188}
189
190impl Stream {
191 pub fn new() -> Self {
193 Self::with_config(StreamConfig::default())
194 }
195
196 pub fn with_config(config: StreamConfig) -> Self {
198 let (sender, receiver) = if config.bounded {
199 bounded(config.buffer_size)
200 } else {
201 unbounded()
202 };
203
204 Self {
205 config,
206 sender,
207 receiver,
208 state: Arc::new(RwLock::new(StreamState {
209 closed: false,
210 watermark: None,
211 last_checkpoint: None,
212 elements_processed: 0,
213 bytes_processed: 0,
214 })),
215 }
216 }
217
218 pub async fn send(&self, message: StreamMessage) -> Result<()> {
220 let state = self.state.read().await;
221 if state.closed {
222 return Err(StreamingError::StreamClosed);
223 }
224 drop(state);
225
226 self.sender
227 .send(message)
228 .map_err(|_| StreamingError::SendError)?;
229
230 Ok(())
231 }
232
233 pub async fn recv(&self) -> Result<StreamMessage> {
235 match self.receiver.recv_timeout(self.config.timeout) {
236 Ok(msg) => {
237 let mut state = self.state.write().await;
239 match &msg {
240 StreamMessage::Data(elem) => {
241 state.elements_processed += 1;
242 state.bytes_processed += elem.size_bytes() as u64;
243 }
244 StreamMessage::Watermark(wm) => {
245 state.watermark = Some(*wm);
246 }
247 StreamMessage::Checkpoint(id) => {
248 state.last_checkpoint = Some(*id);
249 }
250 StreamMessage::EndOfStream => {
251 state.closed = true;
252 }
253 }
254 Ok(msg)
255 }
256 Err(crossbeam_channel::RecvTimeoutError::Timeout) => Err(StreamingError::Timeout),
257 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
258 Err(StreamingError::RecvError)
259 }
260 }
261 }
262
263 pub fn try_recv(&self) -> Result<Option<StreamMessage>> {
265 match self.receiver.try_recv() {
266 Ok(msg) => Ok(Some(msg)),
267 Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
268 Err(crossbeam_channel::TryRecvError::Disconnected) => Err(StreamingError::RecvError),
269 }
270 }
271
272 pub async fn watermark(&self) -> Option<DateTime<Utc>> {
274 self.state.read().await.watermark
275 }
276
277 pub async fn last_checkpoint(&self) -> Option<u64> {
279 self.state.read().await.last_checkpoint
280 }
281
282 pub async fn elements_processed(&self) -> u64 {
284 self.state.read().await.elements_processed
285 }
286
287 pub async fn bytes_processed(&self) -> u64 {
289 self.state.read().await.bytes_processed
290 }
291
292 pub async fn is_closed(&self) -> bool {
294 self.state.read().await.closed
295 }
296
297 pub async fn close(&self) -> Result<()> {
299 let mut state = self.state.write().await;
300 state.closed = true;
301 Ok(())
302 }
303
304 pub fn sender(&self) -> Sender<StreamMessage> {
306 self.sender.clone()
307 }
308
309 pub fn receiver(&self) -> Receiver<StreamMessage> {
311 self.receiver.clone()
312 }
313
314 pub fn config(&self) -> &StreamConfig {
316 &self.config
317 }
318}
319
320impl Default for Stream {
321 fn default() -> Self {
322 Self::new()
323 }
324}
325
326pub struct ChannelSource {
328 receiver: Receiver<StreamMessage>,
329 closed: bool,
330}
331
332impl ChannelSource {
333 pub fn new(receiver: Receiver<StreamMessage>) -> Self {
335 Self {
336 receiver,
337 closed: false,
338 }
339 }
340}
341
342#[async_trait]
343impl StreamSource for ChannelSource {
344 async fn next(&mut self) -> Result<Option<StreamMessage>> {
345 if self.closed {
346 return Ok(None);
347 }
348
349 match self.receiver.try_recv() {
350 Ok(msg) => {
351 if matches!(msg, StreamMessage::EndOfStream) {
352 self.closed = true;
353 }
354 Ok(Some(msg))
355 }
356 Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
357 Err(crossbeam_channel::TryRecvError::Disconnected) => {
358 self.closed = true;
359 Ok(None)
360 }
361 }
362 }
363
364 async fn has_next(&self) -> bool {
365 !self.closed && !self.receiver.is_empty()
366 }
367
368 async fn close(&mut self) -> Result<()> {
369 self.closed = true;
370 Ok(())
371 }
372}
373
374pub struct ChannelSink {
376 sender: Sender<StreamMessage>,
377 buffer: Vec<StreamMessage>,
378 buffer_size: usize,
379}
380
381impl ChannelSink {
382 pub fn new(sender: Sender<StreamMessage>) -> Self {
384 Self::with_buffer_size(sender, 100)
385 }
386
387 pub fn with_buffer_size(sender: Sender<StreamMessage>, buffer_size: usize) -> Self {
389 Self {
390 sender,
391 buffer: Vec::with_capacity(buffer_size),
392 buffer_size,
393 }
394 }
395}
396
397#[async_trait]
398impl StreamSink for ChannelSink {
399 async fn write(&mut self, element: StreamMessage) -> Result<()> {
400 self.buffer.push(element);
401
402 if self.buffer.len() >= self.buffer_size {
403 self.flush().await?;
404 }
405
406 Ok(())
407 }
408
409 async fn flush(&mut self) -> Result<()> {
410 for msg in self.buffer.drain(..) {
411 self.sender
412 .send(msg)
413 .map_err(|_| StreamingError::SendError)?;
414 }
415 Ok(())
416 }
417
418 async fn close(&mut self) -> Result<()> {
419 self.flush().await?;
420 self.sender
421 .send(StreamMessage::EndOfStream)
422 .map_err(|_| StreamingError::SendError)?;
423 Ok(())
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[tokio::test]
432 async fn test_stream_element_creation() {
433 let now = Utc::now();
434 let data = vec![1, 2, 3, 4];
435 let elem = StreamElement::new(data.clone(), now);
436
437 assert_eq!(elem.data, data);
438 assert_eq!(elem.event_time, now);
439 assert!(elem.key.is_none());
440 }
441
442 #[tokio::test]
443 async fn test_stream_send_recv() {
444 let stream = Stream::new();
445 let now = Utc::now();
446 let elem = StreamElement::new(vec![1, 2, 3], now);
447
448 stream
449 .send(StreamMessage::Data(elem.clone()))
450 .await
451 .expect("stream send should succeed");
452
453 match stream.recv().await.expect("stream recv should succeed") {
454 StreamMessage::Data(received) => {
455 assert_eq!(received.data, elem.data);
456 }
457 _ => panic!("Expected data message"),
458 }
459 }
460
461 #[tokio::test]
462 async fn test_stream_watermark() {
463 let stream = Stream::new();
464 let now = Utc::now();
465
466 stream
467 .send(StreamMessage::Watermark(now))
468 .await
469 .expect("stream send should succeed");
470 let _ = stream.recv().await.expect("stream recv should succeed");
471
472 assert_eq!(stream.watermark().await, Some(now));
473 }
474
475 #[tokio::test]
476 async fn test_stream_close() {
477 let stream = Stream::new();
478 assert!(!stream.is_closed().await);
479
480 stream.close().await.expect("stream close should succeed");
481 assert!(stream.is_closed().await);
482
483 let result = stream.send(StreamMessage::EndOfStream).await;
484 assert!(matches!(result, Err(StreamingError::StreamClosed)));
485 }
486}