sea_streamer_socket/
message.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileMessage;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaMessage;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisMessage;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioMessage;
9
10use crate::{Backend, SeaStreamerBackend};
11use sea_streamer_types::{Message, Payload, SeqNo, ShardId, StreamKey, Timestamp};
12
13#[derive(Debug)]
14/// `sea-streamer-socket` concrete type of Message.
15pub enum SeaMessage<'a> {
16    #[cfg(feature = "backend-kafka")]
17    Kafka(KafkaMessage<'a>),
18    #[cfg(feature = "backend-redis")]
19    Redis(RedisMessage),
20    #[cfg(feature = "backend-stdio")]
21    Stdio(StdioMessage),
22    #[cfg(feature = "backend-file")]
23    File(FileMessage),
24    #[cfg(not(feature = "backend-kafka"))]
25    None(std::marker::PhantomData<&'a ()>),
26}
27
28impl<'a> SeaStreamerBackend for SeaMessage<'a> {
29    #[cfg(feature = "backend-kafka")]
30    type Kafka = KafkaMessage<'a>;
31    #[cfg(feature = "backend-redis")]
32    type Redis = RedisMessage;
33    #[cfg(feature = "backend-stdio")]
34    type Stdio = StdioMessage;
35    #[cfg(feature = "backend-file")]
36    type File = FileMessage;
37
38    fn backend(&self) -> Backend {
39        match self {
40            #[cfg(feature = "backend-kafka")]
41            Self::Kafka(_) => Backend::Kafka,
42            #[cfg(feature = "backend-redis")]
43            Self::Redis(_) => Backend::Redis,
44            #[cfg(feature = "backend-stdio")]
45            Self::Stdio(_) => Backend::Stdio,
46            #[cfg(feature = "backend-file")]
47            Self::File(_) => Backend::File,
48            #[cfg(not(feature = "backend-kafka"))]
49            Self::None(_) => unreachable!(),
50        }
51    }
52
53    #[cfg(feature = "backend-kafka")]
54    fn get_kafka(&mut self) -> Option<&mut KafkaMessage<'a>> {
55        match self {
56            Self::Kafka(s) => Some(s),
57            #[cfg(feature = "backend-redis")]
58            Self::Redis(_) => None,
59            #[cfg(feature = "backend-stdio")]
60            Self::Stdio(_) => None,
61            #[cfg(feature = "backend-file")]
62            Self::File(_) => None,
63        }
64    }
65
66    #[cfg(feature = "backend-redis")]
67    fn get_redis(&mut self) -> Option<&mut RedisMessage> {
68        match self {
69            #[cfg(feature = "backend-kafka")]
70            Self::Kafka(_) => None,
71            Self::Redis(s) => Some(s),
72            #[cfg(feature = "backend-stdio")]
73            Self::Stdio(_) => None,
74            #[cfg(feature = "backend-file")]
75            Self::File(_) => None,
76            #[cfg(not(feature = "backend-kafka"))]
77            Self::None(_) => None,
78        }
79    }
80
81    #[cfg(feature = "backend-stdio")]
82    fn get_stdio(&mut self) -> Option<&mut StdioMessage> {
83        match self {
84            #[cfg(feature = "backend-kafka")]
85            Self::Kafka(_) => None,
86            #[cfg(feature = "backend-redis")]
87            Self::Redis(_) => None,
88            Self::Stdio(s) => Some(s),
89            #[cfg(feature = "backend-file")]
90            Self::File(_) => None,
91            #[cfg(not(feature = "backend-kafka"))]
92            Self::None(_) => None,
93        }
94    }
95
96    #[cfg(feature = "backend-file")]
97    fn get_file(&mut self) -> Option<&mut FileMessage> {
98        match self {
99            #[cfg(feature = "backend-kafka")]
100            Self::Kafka(_) => None,
101            #[cfg(feature = "backend-redis")]
102            Self::Redis(_) => None,
103            #[cfg(feature = "backend-stdio")]
104            Self::Stdio(_) => None,
105            Self::File(s) => Some(s),
106            #[cfg(not(feature = "backend-kafka"))]
107            Self::None(_) => None,
108        }
109    }
110}
111
112impl<'a> Message for SeaMessage<'a> {
113    fn stream_key(&self) -> StreamKey {
114        match self {
115            #[cfg(feature = "backend-kafka")]
116            Self::Kafka(i) => i.stream_key(),
117            #[cfg(feature = "backend-redis")]
118            Self::Redis(i) => i.stream_key(),
119            #[cfg(feature = "backend-stdio")]
120            Self::Stdio(i) => i.stream_key(),
121            #[cfg(feature = "backend-file")]
122            Self::File(i) => i.stream_key(),
123            #[cfg(not(feature = "backend-kafka"))]
124            Self::None(_) => unreachable!(),
125        }
126    }
127
128    fn shard_id(&self) -> ShardId {
129        match self {
130            #[cfg(feature = "backend-kafka")]
131            Self::Kafka(i) => i.shard_id(),
132            #[cfg(feature = "backend-redis")]
133            Self::Redis(i) => i.shard_id(),
134            #[cfg(feature = "backend-stdio")]
135            Self::Stdio(i) => i.shard_id(),
136            #[cfg(feature = "backend-file")]
137            Self::File(i) => i.shard_id(),
138            #[cfg(not(feature = "backend-kafka"))]
139            Self::None(_) => unreachable!(),
140        }
141    }
142
143    fn sequence(&self) -> SeqNo {
144        match self {
145            #[cfg(feature = "backend-kafka")]
146            Self::Kafka(i) => i.sequence(),
147            #[cfg(feature = "backend-redis")]
148            Self::Redis(i) => i.sequence(),
149            #[cfg(feature = "backend-stdio")]
150            Self::Stdio(i) => i.sequence(),
151            #[cfg(feature = "backend-file")]
152            Self::File(i) => i.sequence(),
153            #[cfg(not(feature = "backend-kafka"))]
154            Self::None(_) => unreachable!(),
155        }
156    }
157
158    fn timestamp(&self) -> Timestamp {
159        match self {
160            #[cfg(feature = "backend-kafka")]
161            Self::Kafka(i) => i.timestamp(),
162            #[cfg(feature = "backend-redis")]
163            Self::Redis(i) => i.timestamp(),
164            #[cfg(feature = "backend-stdio")]
165            Self::Stdio(i) => i.timestamp(),
166            #[cfg(feature = "backend-file")]
167            Self::File(i) => i.timestamp(),
168            #[cfg(not(feature = "backend-kafka"))]
169            Self::None(_) => unreachable!(),
170        }
171    }
172
173    fn message(&self) -> Payload {
174        match self {
175            #[cfg(feature = "backend-kafka")]
176            Self::Kafka(i) => i.message(),
177            #[cfg(feature = "backend-redis")]
178            Self::Redis(i) => i.message(),
179            #[cfg(feature = "backend-stdio")]
180            Self::Stdio(i) => i.message(),
181            #[cfg(feature = "backend-file")]
182            Self::File(i) => i.message(),
183            #[cfg(not(feature = "backend-kafka"))]
184            Self::None(_) => unreachable!(),
185        }
186    }
187}