fukurow_streaming/
stream.rs1use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::pin::Pin;
8use futures::stream::Stream;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct StreamConfig {
13 pub stream_type: StreamType,
14 pub topic: String,
15 pub group_id: Option<String>,
16 pub partition: Option<i32>,
17 pub options: std::collections::HashMap<String, String>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum StreamType {
23 Kafka,
24 NATS,
25 Redis,
26 RabbitMQ,
27}
28
29#[async_trait]
31pub trait AbstractStream: Send + Sync {
32 async fn send(&self, key: Option<&str>, payload: &[u8]) -> Result<(), StreamError>;
34
35 async fn receive(&self) -> Result<Pin<Box<dyn Stream<Item = Result<StreamMessage, StreamError>> + Send>>, StreamError>;
37
38 async fn close(&self) -> Result<(), StreamError>;
40}
41
42#[derive(Debug, Clone)]
44pub struct StreamMessage {
45 pub key: Option<String>,
46 pub payload: Vec<u8>,
47 pub timestamp: Option<i64>,
48 pub headers: std::collections::HashMap<String, String>,
49}
50
51#[derive(Debug, thiserror::Error)]
53pub enum StreamError {
54 #[error("Channel closed")]
55 ChannelClosed,
56
57 #[error("Processing timeout")]
58 ProcessingTimeout,
59
60 #[error("Connection error: {0}")]
61 ConnectionError(String),
62
63 #[error("Send error: {0}")]
64 SendError(String),
65
66 #[error("Receive error: {0}")]
67 ReceiveError(String),
68
69 #[error("Configuration error: {0}")]
70 ConfigError(String),
71
72 #[error("Processor error: {0}")]
73 ProcessorError(String),
74
75 #[error("Health check failed: {0}")]
76 HealthCheckError(String),
77
78 #[error("Stream closed")]
79 StreamClosed,
80}