use crate::error::{Result, StreamingError};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamElement {
pub data: Vec<u8>,
pub event_time: DateTime<Utc>,
pub processing_time: DateTime<Utc>,
pub key: Option<Vec<u8>>,
pub metadata: StreamMetadata,
}
impl StreamElement {
pub fn new(data: Vec<u8>, event_time: DateTime<Utc>) -> Self {
Self {
data,
event_time,
processing_time: Utc::now(),
key: None,
metadata: StreamMetadata::default(),
}
}
pub fn with_key(mut self, key: Vec<u8>) -> Self {
self.key = Some(key);
self
}
pub fn with_metadata(mut self, metadata: StreamMetadata) -> Self {
self.metadata = metadata;
self
}
pub fn size_bytes(&self) -> usize {
self.data.len() + self.key.as_ref().map_or(0, |k| k.len())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StreamMetadata {
pub source_id: Option<String>,
pub partition_id: Option<u32>,
pub sequence_number: Option<u64>,
pub attributes: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum StreamMessage {
Data(StreamElement),
Watermark(DateTime<Utc>),
Checkpoint(u64),
EndOfStream,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub buffer_size: usize,
pub bounded: bool,
pub timeout: Duration,
pub enable_checkpointing: bool,
pub checkpoint_interval: Duration,
pub parallelism: usize,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
buffer_size: 1024,
bounded: true,
timeout: Duration::from_secs(30),
enable_checkpointing: false,
checkpoint_interval: Duration::from_secs(60),
parallelism: std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1),
}
}
}
#[async_trait]
pub trait StreamSource: Send + Sync {
async fn next(&mut self) -> Result<Option<StreamMessage>>;
async fn has_next(&self) -> bool;
async fn close(&mut self) -> Result<()>;
}
#[async_trait]
pub trait StreamSink: Send + Sync {
async fn write(&mut self, element: StreamMessage) -> Result<()>;
async fn flush(&mut self) -> Result<()>;
async fn close(&mut self) -> Result<()>;
}
pub struct Stream {
config: StreamConfig,
sender: Sender<StreamMessage>,
receiver: Receiver<StreamMessage>,
state: Arc<RwLock<StreamState>>,
}
#[derive(Debug)]
struct StreamState {
closed: bool,
watermark: Option<DateTime<Utc>>,
last_checkpoint: Option<u64>,
elements_processed: u64,
bytes_processed: u64,
}
impl Stream {
pub fn new() -> Self {
Self::with_config(StreamConfig::default())
}
pub fn with_config(config: StreamConfig) -> Self {
let (sender, receiver) = if config.bounded {
bounded(config.buffer_size)
} else {
unbounded()
};
Self {
config,
sender,
receiver,
state: Arc::new(RwLock::new(StreamState {
closed: false,
watermark: None,
last_checkpoint: None,
elements_processed: 0,
bytes_processed: 0,
})),
}
}
pub async fn send(&self, message: StreamMessage) -> Result<()> {
let state = self.state.read().await;
if state.closed {
return Err(StreamingError::StreamClosed);
}
drop(state);
self.sender
.send(message)
.map_err(|_| StreamingError::SendError)?;
Ok(())
}
pub async fn recv(&self) -> Result<StreamMessage> {
match self.receiver.recv_timeout(self.config.timeout) {
Ok(msg) => {
let mut state = self.state.write().await;
match &msg {
StreamMessage::Data(elem) => {
state.elements_processed += 1;
state.bytes_processed += elem.size_bytes() as u64;
}
StreamMessage::Watermark(wm) => {
state.watermark = Some(*wm);
}
StreamMessage::Checkpoint(id) => {
state.last_checkpoint = Some(*id);
}
StreamMessage::EndOfStream => {
state.closed = true;
}
}
Ok(msg)
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => Err(StreamingError::Timeout),
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
Err(StreamingError::RecvError)
}
}
}
pub fn try_recv(&self) -> Result<Option<StreamMessage>> {
match self.receiver.try_recv() {
Ok(msg) => Ok(Some(msg)),
Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
Err(crossbeam_channel::TryRecvError::Disconnected) => Err(StreamingError::RecvError),
}
}
pub async fn watermark(&self) -> Option<DateTime<Utc>> {
self.state.read().await.watermark
}
pub async fn last_checkpoint(&self) -> Option<u64> {
self.state.read().await.last_checkpoint
}
pub async fn elements_processed(&self) -> u64 {
self.state.read().await.elements_processed
}
pub async fn bytes_processed(&self) -> u64 {
self.state.read().await.bytes_processed
}
pub async fn is_closed(&self) -> bool {
self.state.read().await.closed
}
pub async fn close(&self) -> Result<()> {
let mut state = self.state.write().await;
state.closed = true;
Ok(())
}
pub fn sender(&self) -> Sender<StreamMessage> {
self.sender.clone()
}
pub fn receiver(&self) -> Receiver<StreamMessage> {
self.receiver.clone()
}
pub fn config(&self) -> &StreamConfig {
&self.config
}
}
impl Default for Stream {
fn default() -> Self {
Self::new()
}
}
pub struct ChannelSource {
receiver: Receiver<StreamMessage>,
closed: bool,
}
impl ChannelSource {
pub fn new(receiver: Receiver<StreamMessage>) -> Self {
Self {
receiver,
closed: false,
}
}
}
#[async_trait]
impl StreamSource for ChannelSource {
async fn next(&mut self) -> Result<Option<StreamMessage>> {
if self.closed {
return Ok(None);
}
match self.receiver.try_recv() {
Ok(msg) => {
if matches!(msg, StreamMessage::EndOfStream) {
self.closed = true;
}
Ok(Some(msg))
}
Err(crossbeam_channel::TryRecvError::Empty) => Ok(None),
Err(crossbeam_channel::TryRecvError::Disconnected) => {
self.closed = true;
Ok(None)
}
}
}
async fn has_next(&self) -> bool {
!self.closed && !self.receiver.is_empty()
}
async fn close(&mut self) -> Result<()> {
self.closed = true;
Ok(())
}
}
pub struct ChannelSink {
sender: Sender<StreamMessage>,
buffer: Vec<StreamMessage>,
buffer_size: usize,
}
impl ChannelSink {
pub fn new(sender: Sender<StreamMessage>) -> Self {
Self::with_buffer_size(sender, 100)
}
pub fn with_buffer_size(sender: Sender<StreamMessage>, buffer_size: usize) -> Self {
Self {
sender,
buffer: Vec::with_capacity(buffer_size),
buffer_size,
}
}
}
#[async_trait]
impl StreamSink for ChannelSink {
async fn write(&mut self, element: StreamMessage) -> Result<()> {
self.buffer.push(element);
if self.buffer.len() >= self.buffer_size {
self.flush().await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<()> {
for msg in self.buffer.drain(..) {
self.sender
.send(msg)
.map_err(|_| StreamingError::SendError)?;
}
Ok(())
}
async fn close(&mut self) -> Result<()> {
self.flush().await?;
self.sender
.send(StreamMessage::EndOfStream)
.map_err(|_| StreamingError::SendError)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_stream_element_creation() {
let now = Utc::now();
let data = vec![1, 2, 3, 4];
let elem = StreamElement::new(data.clone(), now);
assert_eq!(elem.data, data);
assert_eq!(elem.event_time, now);
assert!(elem.key.is_none());
}
#[tokio::test]
async fn test_stream_send_recv() {
let stream = Stream::new();
let now = Utc::now();
let elem = StreamElement::new(vec![1, 2, 3], now);
stream
.send(StreamMessage::Data(elem.clone()))
.await
.expect("stream send should succeed");
match stream.recv().await.expect("stream recv should succeed") {
StreamMessage::Data(received) => {
assert_eq!(received.data, elem.data);
}
_ => panic!("Expected data message"),
}
}
#[tokio::test]
async fn test_stream_watermark() {
let stream = Stream::new();
let now = Utc::now();
stream
.send(StreamMessage::Watermark(now))
.await
.expect("stream send should succeed");
let _ = stream.recv().await.expect("stream recv should succeed");
assert_eq!(stream.watermark().await, Some(now));
}
#[tokio::test]
async fn test_stream_close() {
let stream = Stream::new();
assert!(!stream.is_closed().await);
stream.close().await.expect("stream close should succeed");
assert!(stream.is_closed().await);
let result = stream.send(StreamMessage::EndOfStream).await;
assert!(matches!(result, Err(StreamingError::StreamClosed)));
}
}