Skip to main content

oxigdal_streaming/core/
stream.rs

1//! Core stream types and traits.
2
3use crate::error::{Result, StreamingError};
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11
12/// A stream element containing data and metadata.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct StreamElement {
15    /// The actual data payload
16    pub data: Vec<u8>,
17
18    /// Event timestamp
19    pub event_time: DateTime<Utc>,
20
21    /// Processing timestamp
22    pub processing_time: DateTime<Utc>,
23
24    /// Optional key for partitioning
25    pub key: Option<Vec<u8>>,
26
27    /// Metadata
28    pub metadata: StreamMetadata,
29}
30
31impl StreamElement {
32    /// Create a new stream element.
33    pub fn new(data: Vec<u8>, event_time: DateTime<Utc>) -> Self {
34        Self {
35            data,
36            event_time,
37            processing_time: Utc::now(),
38            key: None,
39            metadata: StreamMetadata::default(),
40        }
41    }
42
43    /// Create a new stream element with a key.
44    pub fn with_key(mut self, key: Vec<u8>) -> Self {
45        self.key = Some(key);
46        self
47    }
48
49    /// Create a new stream element with metadata.
50    pub fn with_metadata(mut self, metadata: StreamMetadata) -> Self {
51        self.metadata = metadata;
52        self
53    }
54
55    /// Get the size in bytes of this element.
56    pub fn size_bytes(&self) -> usize {
57        self.data.len() + self.key.as_ref().map_or(0, |k| k.len())
58    }
59}
60
61/// Metadata associated with a stream element.
62#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63pub struct StreamMetadata {
64    /// Source identifier
65    pub source_id: Option<String>,
66
67    /// Partition ID
68    pub partition_id: Option<u32>,
69
70    /// Sequence number
71    pub sequence_number: Option<u64>,
72
73    /// Custom attributes
74    pub attributes: std::collections::HashMap<String, String>,
75}
76
77/// Message types in a stream.
78#[derive(Debug, Clone)]
79pub enum StreamMessage {
80    /// Data element
81    Data(StreamElement),
82
83    /// Watermark for event time progress
84    Watermark(DateTime<Utc>),
85
86    /// Checkpoint barrier
87    Checkpoint(u64),
88
89    /// End of stream marker
90    EndOfStream,
91}
92
93/// Configuration for a stream.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct StreamConfig {
96    /// Buffer size for the stream
97    pub buffer_size: usize,
98
99    /// Whether to use bounded channels
100    pub bounded: bool,
101
102    /// Timeout for operations
103    pub timeout: Duration,
104
105    /// Enable checkpointing
106    pub enable_checkpointing: bool,
107
108    /// Checkpoint interval
109    pub checkpoint_interval: Duration,
110
111    /// Parallelism level
112    pub parallelism: usize,
113}
114
115impl Default for StreamConfig {
116    fn default() -> Self {
117        Self {
118            buffer_size: 1024,
119            bounded: true,
120            timeout: Duration::from_secs(30),
121            enable_checkpointing: false,
122            checkpoint_interval: Duration::from_secs(60),
123            parallelism: std::thread::available_parallelism()
124                .map(|n| n.get())
125                .unwrap_or(1),
126        }
127    }
128}
129
130/// A source that produces stream elements.
131#[async_trait]
132pub trait StreamSource: Send + Sync {
133    /// Read the next element from the source.
134    async fn next(&mut self) -> Result<Option<StreamMessage>>;
135
136    /// Check if the source has more elements.
137    async fn has_next(&self) -> bool;
138
139    /// Close the source.
140    async fn close(&mut self) -> Result<()>;
141}
142
143/// A sink that consumes stream elements.
144#[async_trait]
145pub trait StreamSink: Send + Sync {
146    /// Write an element to the sink.
147    async fn write(&mut self, element: StreamMessage) -> Result<()>;
148
149    /// Flush buffered elements.
150    async fn flush(&mut self) -> Result<()>;
151
152    /// Close the sink.
153    async fn close(&mut self) -> Result<()>;
154}
155
156/// A stream of data elements with transformation capabilities.
157pub struct Stream {
158    /// Configuration
159    config: StreamConfig,
160
161    /// Sender for stream messages
162    sender: Sender<StreamMessage>,
163
164    /// Receiver for stream messages
165    receiver: Receiver<StreamMessage>,
166
167    /// Stream state
168    state: Arc<RwLock<StreamState>>,
169}
170
171/// Internal state of a stream.
172#[derive(Debug)]
173struct StreamState {
174    /// Is the stream closed?
175    closed: bool,
176
177    /// Current watermark
178    watermark: Option<DateTime<Utc>>,
179
180    /// Last checkpoint ID
181    last_checkpoint: Option<u64>,
182
183    /// Total elements processed
184    elements_processed: u64,
185
186    /// Total bytes processed
187    bytes_processed: u64,
188}
189
190impl Stream {
191    /// Create a new stream with default configuration.
192    pub fn new() -> Self {
193        Self::with_config(StreamConfig::default())
194    }
195
196    /// Create a new stream with custom configuration.
197    pub fn with_config(config: StreamConfig) -> Self {
198        let (sender, receiver) = if config.bounded {
199            bounded(config.buffer_size)
200        } else {
201            unbounded()
202        };
203
204        Self {
205            config,
206            sender,
207            receiver,
208            state: Arc::new(RwLock::new(StreamState {
209                closed: false,
210                watermark: None,
211                last_checkpoint: None,
212                elements_processed: 0,
213                bytes_processed: 0,
214            })),
215        }
216    }
217
218    /// Send a message to the stream.
219    pub async fn send(&self, message: StreamMessage) -> Result<()> {
220        let state = self.state.read().await;
221        if state.closed {
222            return Err(StreamingError::StreamClosed);
223        }
224        drop(state);
225
226        self.sender
227            .send(message)
228            .map_err(|_| StreamingError::SendError)?;
229
230        Ok(())
231    }
232
233    /// Receive a message from the stream.
234    pub async fn recv(&self) -> Result<StreamMessage> {
235        match self.receiver.recv_timeout(self.config.timeout) {
236            Ok(msg) => {
237                // Update state
238                let mut state = self.state.write().await;
239                match &msg {
240                    StreamMessage::Data(elem) => {
241                        state.elements_processed += 1;
242                        state.bytes_processed += elem.size_bytes() as u64;
243                    }
244                    StreamMessage::Watermark(wm) => {
245                        state.watermark = Some(*wm);
246                    }
247                    StreamMessage::Checkpoint(id) => {
248                        state.last_checkpoint = Some(*id);
249                    }
250                    StreamMessage::EndOfStream => {
251                        state.closed = true;
252                    }
253                }
254                Ok(msg)
255            }
256            Err(crossbeam_channel::RecvTimeoutError::Timeout) => Err(StreamingError::Timeout),
257            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
258                Err(StreamingError::RecvError)
259            }
260        }
261    }
262
263    /// Try to receive a message without blocking.
264    pub fn try_recv(&self) -> Result<Option<StreamMessage>> {
265        match self.receiver.try_recv() {
266            Ok(msg) => Ok(Some(msg)),
267            Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
268            Err(crossbeam_channel::TryRecvError::Disconnected) => Err(StreamingError::RecvError),
269        }
270    }
271
272    /// Get the current watermark.
273    pub async fn watermark(&self) -> Option<DateTime<Utc>> {
274        self.state.read().await.watermark
275    }
276
277    /// Get the last checkpoint ID.
278    pub async fn last_checkpoint(&self) -> Option<u64> {
279        self.state.read().await.last_checkpoint
280    }
281
282    /// Get the number of elements processed.
283    pub async fn elements_processed(&self) -> u64 {
284        self.state.read().await.elements_processed
285    }
286
287    /// Get the total bytes processed.
288    pub async fn bytes_processed(&self) -> u64 {
289        self.state.read().await.bytes_processed
290    }
291
292    /// Check if the stream is closed.
293    pub async fn is_closed(&self) -> bool {
294        self.state.read().await.closed
295    }
296
297    /// Close the stream.
298    pub async fn close(&self) -> Result<()> {
299        let mut state = self.state.write().await;
300        state.closed = true;
301        Ok(())
302    }
303
304    /// Get a clone of the sender.
305    pub fn sender(&self) -> Sender<StreamMessage> {
306        self.sender.clone()
307    }
308
309    /// Get a clone of the receiver.
310    pub fn receiver(&self) -> Receiver<StreamMessage> {
311        self.receiver.clone()
312    }
313
314    /// Get the stream configuration.
315    pub fn config(&self) -> &StreamConfig {
316        &self.config
317    }
318}
319
320impl Default for Stream {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326/// A channel-based stream source.
327pub struct ChannelSource {
328    receiver: Receiver<StreamMessage>,
329    closed: bool,
330}
331
332impl ChannelSource {
333    /// Create a new channel source.
334    pub fn new(receiver: Receiver<StreamMessage>) -> Self {
335        Self {
336            receiver,
337            closed: false,
338        }
339    }
340}
341
342#[async_trait]
343impl StreamSource for ChannelSource {
344    async fn next(&mut self) -> Result<Option<StreamMessage>> {
345        if self.closed {
346            return Ok(None);
347        }
348
349        match self.receiver.try_recv() {
350            Ok(msg) => {
351                if matches!(msg, StreamMessage::EndOfStream) {
352                    self.closed = true;
353                }
354                Ok(Some(msg))
355            }
356            Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
357            Err(crossbeam_channel::TryRecvError::Disconnected) => {
358                self.closed = true;
359                Ok(None)
360            }
361        }
362    }
363
364    async fn has_next(&self) -> bool {
365        !self.closed && !self.receiver.is_empty()
366    }
367
368    async fn close(&mut self) -> Result<()> {
369        self.closed = true;
370        Ok(())
371    }
372}
373
374/// A channel-based stream sink.
375pub struct ChannelSink {
376    sender: Sender<StreamMessage>,
377    buffer: Vec<StreamMessage>,
378    buffer_size: usize,
379}
380
381impl ChannelSink {
382    /// Create a new channel sink.
383    pub fn new(sender: Sender<StreamMessage>) -> Self {
384        Self::with_buffer_size(sender, 100)
385    }
386
387    /// Create a new channel sink with a custom buffer size.
388    pub fn with_buffer_size(sender: Sender<StreamMessage>, buffer_size: usize) -> Self {
389        Self {
390            sender,
391            buffer: Vec::with_capacity(buffer_size),
392            buffer_size,
393        }
394    }
395}
396
397#[async_trait]
398impl StreamSink for ChannelSink {
399    async fn write(&mut self, element: StreamMessage) -> Result<()> {
400        self.buffer.push(element);
401
402        if self.buffer.len() >= self.buffer_size {
403            self.flush().await?;
404        }
405
406        Ok(())
407    }
408
409    async fn flush(&mut self) -> Result<()> {
410        for msg in self.buffer.drain(..) {
411            self.sender
412                .send(msg)
413                .map_err(|_| StreamingError::SendError)?;
414        }
415        Ok(())
416    }
417
418    async fn close(&mut self) -> Result<()> {
419        self.flush().await?;
420        self.sender
421            .send(StreamMessage::EndOfStream)
422            .map_err(|_| StreamingError::SendError)?;
423        Ok(())
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[tokio::test]
432    async fn test_stream_element_creation() {
433        let now = Utc::now();
434        let data = vec![1, 2, 3, 4];
435        let elem = StreamElement::new(data.clone(), now);
436
437        assert_eq!(elem.data, data);
438        assert_eq!(elem.event_time, now);
439        assert!(elem.key.is_none());
440    }
441
442    #[tokio::test]
443    async fn test_stream_send_recv() {
444        let stream = Stream::new();
445        let now = Utc::now();
446        let elem = StreamElement::new(vec![1, 2, 3], now);
447
448        stream
449            .send(StreamMessage::Data(elem.clone()))
450            .await
451            .expect("stream send should succeed");
452
453        match stream.recv().await.expect("stream recv should succeed") {
454            StreamMessage::Data(received) => {
455                assert_eq!(received.data, elem.data);
456            }
457            _ => panic!("Expected data message"),
458        }
459    }
460
461    #[tokio::test]
462    async fn test_stream_watermark() {
463        let stream = Stream::new();
464        let now = Utc::now();
465
466        stream
467            .send(StreamMessage::Watermark(now))
468            .await
469            .expect("stream send should succeed");
470        let _ = stream.recv().await.expect("stream recv should succeed");
471
472        assert_eq!(stream.watermark().await, Some(now));
473    }
474
475    #[tokio::test]
476    async fn test_stream_close() {
477        let stream = Stream::new();
478        assert!(!stream.is_closed().await);
479
480        stream.close().await.expect("stream close should succeed");
481        assert!(stream.is_closed().await);
482
483        let result = stream.send(StreamMessage::EndOfStream).await;
484        assert!(matches!(result, Err(StreamingError::StreamClosed)));
485    }
486}