1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileConsumer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaConsumer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisConsumer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioConsumer;
9
10use crate::{map_err, Backend, BackendErr, SeaMessage, SeaResult, SeaStreamerBackend};
11use sea_streamer_types::{
12 export::futures::{FutureExt, Stream},
13 Consumer, SeqPos, ShardId, StreamKey, StreamResult, Timestamp,
14};
15use std::{fmt::Debug, future::Future, pin::Pin, task::Poll};
16
17#[derive(Debug)]
18pub struct SeaConsumer {
20 pub(crate) backend: SeaConsumerBackend,
21}
22
23#[derive(Debug)]
24pub(crate) enum SeaConsumerBackend {
25 #[cfg(feature = "backend-kafka")]
26 Kafka(KafkaConsumer),
27 #[cfg(feature = "backend-redis")]
28 Redis(RedisConsumer),
29 #[cfg(feature = "backend-stdio")]
30 Stdio(StdioConsumer),
31 #[cfg(feature = "backend-file")]
32 File(FileConsumer),
33}
34
35pub enum NextFuture<'a> {
37 #[cfg(feature = "backend-kafka")]
38 Kafka(sea_streamer_kafka::NextFuture<'a>),
39 #[cfg(feature = "backend-redis")]
40 Redis(sea_streamer_redis::NextFuture<'a>),
41 #[cfg(feature = "backend-stdio")]
42 Stdio(sea_streamer_stdio::NextFuture<'a>),
43 #[cfg(feature = "backend-file")]
44 File(sea_streamer_file::NextFuture<'a>),
45}
46
47pub enum SeaMessageStream<'a> {
49 #[cfg(feature = "backend-kafka")]
50 Kafka(sea_streamer_kafka::KafkaMessageStream<'a>),
51 #[cfg(feature = "backend-redis")]
52 Redis(sea_streamer_redis::RedisMessageStream<'a>),
53 #[cfg(feature = "backend-stdio")]
54 Stdio(sea_streamer_stdio::StdioMessageStream<'a>),
55 #[cfg(feature = "backend-file")]
56 File(sea_streamer_file::FileMessageStream<'a>),
57}
58
59impl<'a> Debug for NextFuture<'a> {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 #[cfg(feature = "backend-kafka")]
63 Self::Kafka(_) => write!(f, "NextFuture::Kafka"),
64 #[cfg(feature = "backend-redis")]
65 Self::Redis(_) => write!(f, "NextFuture::Redis"),
66 #[cfg(feature = "backend-stdio")]
67 Self::Stdio(_) => write!(f, "NextFuture::Stdio"),
68 #[cfg(feature = "backend-file")]
69 Self::File(_) => write!(f, "NextFuture::File"),
70 }
71 }
72}
73
74impl<'a> Debug for SeaMessageStream<'a> {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 #[cfg(feature = "backend-kafka")]
78 Self::Kafka(_) => write!(f, "KafkaMessageStream"),
79 #[cfg(feature = "backend-redis")]
80 Self::Redis(_) => write!(f, "RedisMessageStream"),
81 #[cfg(feature = "backend-stdio")]
82 Self::Stdio(_) => write!(f, "StdioMessageStream"),
83 #[cfg(feature = "backend-file")]
84 Self::File(_) => write!(f, "FileMessageStream"),
85 }
86 }
87}
88
89#[cfg(feature = "backend-kafka")]
90impl From<KafkaConsumer> for SeaConsumer {
91 fn from(i: KafkaConsumer) -> Self {
92 Self {
93 backend: SeaConsumerBackend::Kafka(i),
94 }
95 }
96}
97
98#[cfg(feature = "backend-redis")]
99impl From<RedisConsumer> for SeaConsumer {
100 fn from(i: RedisConsumer) -> Self {
101 Self {
102 backend: SeaConsumerBackend::Redis(i),
103 }
104 }
105}
106
107#[cfg(feature = "backend-stdio")]
108impl From<StdioConsumer> for SeaConsumer {
109 fn from(i: StdioConsumer) -> Self {
110 Self {
111 backend: SeaConsumerBackend::Stdio(i),
112 }
113 }
114}
115
116#[cfg(feature = "backend-file")]
117impl From<FileConsumer> for SeaConsumer {
118 fn from(i: FileConsumer) -> Self {
119 Self {
120 backend: SeaConsumerBackend::File(i),
121 }
122 }
123}
124
125impl SeaStreamerBackend for SeaConsumer {
126 #[cfg(feature = "backend-kafka")]
127 type Kafka = KafkaConsumer;
128 #[cfg(feature = "backend-redis")]
129 type Redis = RedisConsumer;
130 #[cfg(feature = "backend-stdio")]
131 type Stdio = StdioConsumer;
132 #[cfg(feature = "backend-file")]
133 type File = FileConsumer;
134
135 fn backend(&self) -> Backend {
136 match self.backend {
137 #[cfg(feature = "backend-kafka")]
138 SeaConsumerBackend::Kafka(_) => Backend::Kafka,
139 #[cfg(feature = "backend-redis")]
140 SeaConsumerBackend::Redis(_) => Backend::Redis,
141 #[cfg(feature = "backend-stdio")]
142 SeaConsumerBackend::Stdio(_) => Backend::Stdio,
143 #[cfg(feature = "backend-file")]
144 SeaConsumerBackend::File(_) => Backend::File,
145 }
146 }
147
148 #[cfg(feature = "backend-kafka")]
149 fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
150 match &mut self.backend {
151 SeaConsumerBackend::Kafka(s) => Some(s),
152 #[cfg(feature = "backend-redis")]
153 SeaConsumerBackend::Redis(_) => None,
154 #[cfg(feature = "backend-stdio")]
155 SeaConsumerBackend::Stdio(_) => None,
156 #[cfg(feature = "backend-file")]
157 SeaConsumerBackend::File(_) => None,
158 }
159 }
160
161 #[cfg(feature = "backend-redis")]
162 fn get_redis(&mut self) -> Option<&mut Self::Redis> {
163 match &mut self.backend {
164 #[cfg(feature = "backend-kafka")]
165 SeaConsumerBackend::Kafka(_) => None,
166 SeaConsumerBackend::Redis(s) => Some(s),
167 #[cfg(feature = "backend-stdio")]
168 SeaConsumerBackend::Stdio(_) => None,
169 #[cfg(feature = "backend-file")]
170 SeaConsumerBackend::File(_) => None,
171 }
172 }
173
174 #[cfg(feature = "backend-stdio")]
175 fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
176 match &mut self.backend {
177 #[cfg(feature = "backend-kafka")]
178 SeaConsumerBackend::Kafka(_) => None,
179 #[cfg(feature = "backend-redis")]
180 SeaConsumerBackend::Redis(_) => None,
181 SeaConsumerBackend::Stdio(s) => Some(s),
182 #[cfg(feature = "backend-file")]
183 SeaConsumerBackend::File(_) => None,
184 }
185 }
186
187 #[cfg(feature = "backend-file")]
188 fn get_file(&mut self) -> Option<&mut Self::File> {
189 match &mut self.backend {
190 #[cfg(feature = "backend-kafka")]
191 SeaConsumerBackend::Kafka(_) => None,
192 #[cfg(feature = "backend-redis")]
193 SeaConsumerBackend::Redis(_) => None,
194 #[cfg(feature = "backend-stdio")]
195 SeaConsumerBackend::Stdio(_) => None,
196 SeaConsumerBackend::File(s) => Some(s),
197 }
198 }
199}
200
201impl Consumer for SeaConsumer {
202 type Error = BackendErr;
203 type Message<'a> = SeaMessage<'a>;
204 type NextFuture<'a> = NextFuture<'a>;
205 type Stream<'a> = SeaMessageStream<'a>;
206
207 async fn seek(&mut self, to: Timestamp) -> SeaResult<()> {
208 match &mut self.backend {
209 #[cfg(feature = "backend-kafka")]
210 SeaConsumerBackend::Kafka(i) => i.seek(to).await.map_err(map_err),
211 #[cfg(feature = "backend-redis")]
212 SeaConsumerBackend::Redis(i) => i.seek(to).await.map_err(map_err),
213 #[cfg(feature = "backend-stdio")]
214 SeaConsumerBackend::Stdio(i) => i.seek(to).await.map_err(map_err),
215 #[cfg(feature = "backend-file")]
216 SeaConsumerBackend::File(i) => i.seek(to).await.map_err(map_err),
217 }
218 }
219
220 async fn rewind(&mut self, pos: SeqPos) -> SeaResult<()> {
221 match &mut self.backend {
222 #[cfg(feature = "backend-kafka")]
223 SeaConsumerBackend::Kafka(i) => i.rewind(pos).await.map_err(map_err),
224 #[cfg(feature = "backend-redis")]
225 SeaConsumerBackend::Redis(i) => i.rewind(pos).await.map_err(map_err),
226 #[cfg(feature = "backend-stdio")]
227 SeaConsumerBackend::Stdio(i) => i.rewind(pos).await.map_err(map_err),
228 #[cfg(feature = "backend-file")]
229 SeaConsumerBackend::File(i) => i.rewind(pos).await.map_err(map_err),
230 }
231 }
232
233 fn assign(&mut self, ss: (StreamKey, ShardId)) -> SeaResult<()> {
234 match &mut self.backend {
235 #[cfg(feature = "backend-kafka")]
236 SeaConsumerBackend::Kafka(i) => i.assign(ss).map_err(map_err),
237 #[cfg(feature = "backend-redis")]
238 SeaConsumerBackend::Redis(i) => i.assign(ss).map_err(map_err),
239 #[cfg(feature = "backend-stdio")]
240 SeaConsumerBackend::Stdio(i) => i.assign(ss).map_err(map_err),
241 #[cfg(feature = "backend-file")]
242 SeaConsumerBackend::File(i) => i.assign(ss).map_err(map_err),
243 }
244 }
245
246 fn unassign(&mut self, ss: (StreamKey, ShardId)) -> SeaResult<()> {
247 match &mut self.backend {
248 #[cfg(feature = "backend-kafka")]
249 SeaConsumerBackend::Kafka(i) => i.unassign(ss).map_err(map_err),
250 #[cfg(feature = "backend-redis")]
251 SeaConsumerBackend::Redis(i) => i.unassign(ss).map_err(map_err),
252 #[cfg(feature = "backend-stdio")]
253 SeaConsumerBackend::Stdio(i) => i.unassign(ss).map_err(map_err),
254 #[cfg(feature = "backend-file")]
255 SeaConsumerBackend::File(i) => i.unassign(ss).map_err(map_err),
256 }
257 }
258
259 fn next(&self) -> Self::NextFuture<'_> {
260 match &self.backend {
261 #[cfg(feature = "backend-kafka")]
262 SeaConsumerBackend::Kafka(i) => NextFuture::Kafka(i.next()),
263 #[cfg(feature = "backend-redis")]
264 SeaConsumerBackend::Redis(i) => NextFuture::Redis(i.next()),
265 #[cfg(feature = "backend-stdio")]
266 SeaConsumerBackend::Stdio(i) => NextFuture::Stdio(i.next()),
267 #[cfg(feature = "backend-file")]
268 SeaConsumerBackend::File(i) => NextFuture::File(i.next()),
269 }
270 }
271
272 fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
273 match &mut self.backend {
274 #[cfg(feature = "backend-kafka")]
275 SeaConsumerBackend::Kafka(i) => SeaMessageStream::Kafka(i.stream()),
276 #[cfg(feature = "backend-redis")]
277 SeaConsumerBackend::Redis(i) => SeaMessageStream::Redis(i.stream()),
278 #[cfg(feature = "backend-stdio")]
279 SeaConsumerBackend::Stdio(i) => SeaMessageStream::Stdio(i.stream()),
280 #[cfg(feature = "backend-file")]
281 SeaConsumerBackend::File(i) => SeaMessageStream::File(i.stream()),
282 }
283 }
284}
285
286impl<'a> Future for NextFuture<'a> {
287 type Output = StreamResult<SeaMessage<'a>, BackendErr>;
288
289 fn poll(
290 self: Pin<&mut Self>,
291 cx: &mut std::task::Context<'_>,
292 ) -> std::task::Poll<Self::Output> {
293 match Pin::into_inner(self) {
294 #[cfg(feature = "backend-kafka")]
295 Self::Kafka(fut) => match Pin::new(fut).poll_unpin(cx) {
296 Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Kafka).map_err(map_err)),
297 Poll::Pending => Poll::Pending,
298 },
299 #[cfg(feature = "backend-redis")]
300 Self::Redis(fut) => match Pin::new(fut).poll_unpin(cx) {
301 Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Redis).map_err(map_err)),
302 Poll::Pending => Poll::Pending,
303 },
304 #[cfg(feature = "backend-stdio")]
305 Self::Stdio(fut) => match Pin::new(fut).poll_unpin(cx) {
306 Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Stdio).map_err(map_err)),
307 Poll::Pending => Poll::Pending,
308 },
309 #[cfg(feature = "backend-file")]
310 Self::File(fut) => match Pin::new(fut).poll_unpin(cx) {
311 Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::File).map_err(map_err)),
312 Poll::Pending => Poll::Pending,
313 },
314 }
315 }
316}
317
318impl<'a> Stream for SeaMessageStream<'a> {
319 type Item = StreamResult<SeaMessage<'a>, BackendErr>;
320
321 fn poll_next(
322 self: Pin<&mut Self>,
323 cx: &mut std::task::Context<'_>,
324 ) -> Poll<Option<Self::Item>> {
325 match Pin::into_inner(self) {
326 #[cfg(feature = "backend-kafka")]
327 Self::Kafka(ss) => match Pin::new(ss).poll_next(cx) {
328 Poll::Ready(Some(res)) => {
329 Poll::Ready(Some(res.map(SeaMessage::Kafka).map_err(map_err)))
330 }
331 Poll::Ready(None) => Poll::Ready(None),
332 Poll::Pending => Poll::Pending,
333 },
334 #[cfg(feature = "backend-redis")]
335 Self::Redis(ss) => match Pin::new(ss).poll_next(cx) {
336 Poll::Ready(Some(res)) => {
337 Poll::Ready(Some(res.map(SeaMessage::Redis).map_err(map_err)))
338 }
339 Poll::Ready(None) => Poll::Ready(None),
340 Poll::Pending => Poll::Pending,
341 },
342 #[cfg(feature = "backend-stdio")]
343 Self::Stdio(ss) => match Pin::new(ss).poll_next(cx) {
344 Poll::Ready(Some(res)) => {
345 Poll::Ready(Some(res.map(SeaMessage::Stdio).map_err(map_err)))
346 }
347 Poll::Ready(None) => Poll::Ready(None),
348 Poll::Pending => Poll::Pending,
349 },
350 #[cfg(feature = "backend-file")]
351 Self::File(ss) => match Pin::new(ss).poll_next(cx) {
352 Poll::Ready(Some(res)) => {
353 Poll::Ready(Some(res.map(SeaMessage::File).map_err(map_err)))
354 }
355 Poll::Ready(None) => Poll::Ready(None),
356 Poll::Pending => Poll::Pending,
357 },
358 }
359 }
360}