fukurow_streaming/
stream.rs

1//! # Stream Abstraction
2//!
3//! Abstract stream interface for different streaming platforms
4
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::pin::Pin;
8use futures::stream::Stream;
9
10/// Stream configuration
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct StreamConfig {
13    pub stream_type: StreamType,
14    pub topic: String,
15    pub group_id: Option<String>,
16    pub partition: Option<i32>,
17    pub options: std::collections::HashMap<String, String>,
18}
19
20/// Stream type
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum StreamType {
23    Kafka,
24    NATS,
25    Redis,
26    RabbitMQ,
27}
28
29/// Abstract stream interface
30#[async_trait]
31pub trait AbstractStream: Send + Sync {
32    /// Send a message to the stream
33    async fn send(&self, key: Option<&str>, payload: &[u8]) -> Result<(), StreamError>;
34
35    /// Receive messages from the stream
36    async fn receive(&self) -> Result<Pin<Box<dyn Stream<Item = Result<StreamMessage, StreamError>> + Send>>, StreamError>;
37
38    /// Close the stream connection
39    async fn close(&self) -> Result<(), StreamError>;
40}
41
42/// Stream message
43#[derive(Debug, Clone)]
44pub struct StreamMessage {
45    pub key: Option<String>,
46    pub payload: Vec<u8>,
47    pub timestamp: Option<i64>,
48    pub headers: std::collections::HashMap<String, String>,
49}
50
51/// Stream error
52#[derive(Debug, thiserror::Error)]
53pub enum StreamError {
54    #[error("Channel closed")]
55    ChannelClosed,
56
57    #[error("Processing timeout")]
58    ProcessingTimeout,
59
60    #[error("Connection error: {0}")]
61    ConnectionError(String),
62
63    #[error("Send error: {0}")]
64    SendError(String),
65
66    #[error("Receive error: {0}")]
67    ReceiveError(String),
68
69    #[error("Configuration error: {0}")]
70    ConfigError(String),
71
72    #[error("Processor error: {0}")]
73    ProcessorError(String),
74
75    #[error("Health check failed: {0}")]
76    HealthCheckError(String),
77
78    #[error("Stream closed")]
79    StreamClosed,
80}