sea_streamer_socket/
consumer.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileConsumer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaConsumer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisConsumer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioConsumer;
9
10use crate::{map_err, Backend, BackendErr, SeaMessage, SeaResult, SeaStreamerBackend};
11use sea_streamer_types::{
12    export::futures::{FutureExt, Stream},
13    Consumer, SeqPos, ShardId, StreamKey, StreamResult, Timestamp,
14};
15use std::{fmt::Debug, future::Future, pin::Pin, task::Poll};
16
17#[derive(Debug)]
18/// `sea-streamer-socket` concrete type of Consumer.
19pub struct SeaConsumer {
20    pub(crate) backend: SeaConsumerBackend,
21}
22
23#[derive(Debug)]
24pub(crate) enum SeaConsumerBackend {
25    #[cfg(feature = "backend-kafka")]
26    Kafka(KafkaConsumer),
27    #[cfg(feature = "backend-redis")]
28    Redis(RedisConsumer),
29    #[cfg(feature = "backend-stdio")]
30    Stdio(StdioConsumer),
31    #[cfg(feature = "backend-file")]
32    File(FileConsumer),
33}
34
35/// `sea-streamer-socket` concrete type of Future that will yield the next message.
36pub enum NextFuture<'a> {
37    #[cfg(feature = "backend-kafka")]
38    Kafka(sea_streamer_kafka::NextFuture<'a>),
39    #[cfg(feature = "backend-redis")]
40    Redis(sea_streamer_redis::NextFuture<'a>),
41    #[cfg(feature = "backend-stdio")]
42    Stdio(sea_streamer_stdio::NextFuture<'a>),
43    #[cfg(feature = "backend-file")]
44    File(sea_streamer_file::NextFuture<'a>),
45}
46
47/// `sea-streamer-socket` concrete type of Stream that will yield the next messages.
48pub enum SeaMessageStream<'a> {
49    #[cfg(feature = "backend-kafka")]
50    Kafka(sea_streamer_kafka::KafkaMessageStream<'a>),
51    #[cfg(feature = "backend-redis")]
52    Redis(sea_streamer_redis::RedisMessageStream<'a>),
53    #[cfg(feature = "backend-stdio")]
54    Stdio(sea_streamer_stdio::StdioMessageStream<'a>),
55    #[cfg(feature = "backend-file")]
56    File(sea_streamer_file::FileMessageStream<'a>),
57}
58
59impl<'a> Debug for NextFuture<'a> {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            #[cfg(feature = "backend-kafka")]
63            Self::Kafka(_) => write!(f, "NextFuture::Kafka"),
64            #[cfg(feature = "backend-redis")]
65            Self::Redis(_) => write!(f, "NextFuture::Redis"),
66            #[cfg(feature = "backend-stdio")]
67            Self::Stdio(_) => write!(f, "NextFuture::Stdio"),
68            #[cfg(feature = "backend-file")]
69            Self::File(_) => write!(f, "NextFuture::File"),
70        }
71    }
72}
73
74impl<'a> Debug for SeaMessageStream<'a> {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            #[cfg(feature = "backend-kafka")]
78            Self::Kafka(_) => write!(f, "KafkaMessageStream"),
79            #[cfg(feature = "backend-redis")]
80            Self::Redis(_) => write!(f, "RedisMessageStream"),
81            #[cfg(feature = "backend-stdio")]
82            Self::Stdio(_) => write!(f, "StdioMessageStream"),
83            #[cfg(feature = "backend-file")]
84            Self::File(_) => write!(f, "FileMessageStream"),
85        }
86    }
87}
88
89#[cfg(feature = "backend-kafka")]
90impl From<KafkaConsumer> for SeaConsumer {
91    fn from(i: KafkaConsumer) -> Self {
92        Self {
93            backend: SeaConsumerBackend::Kafka(i),
94        }
95    }
96}
97
98#[cfg(feature = "backend-redis")]
99impl From<RedisConsumer> for SeaConsumer {
100    fn from(i: RedisConsumer) -> Self {
101        Self {
102            backend: SeaConsumerBackend::Redis(i),
103        }
104    }
105}
106
107#[cfg(feature = "backend-stdio")]
108impl From<StdioConsumer> for SeaConsumer {
109    fn from(i: StdioConsumer) -> Self {
110        Self {
111            backend: SeaConsumerBackend::Stdio(i),
112        }
113    }
114}
115
116#[cfg(feature = "backend-file")]
117impl From<FileConsumer> for SeaConsumer {
118    fn from(i: FileConsumer) -> Self {
119        Self {
120            backend: SeaConsumerBackend::File(i),
121        }
122    }
123}
124
125impl SeaStreamerBackend for SeaConsumer {
126    #[cfg(feature = "backend-kafka")]
127    type Kafka = KafkaConsumer;
128    #[cfg(feature = "backend-redis")]
129    type Redis = RedisConsumer;
130    #[cfg(feature = "backend-stdio")]
131    type Stdio = StdioConsumer;
132    #[cfg(feature = "backend-file")]
133    type File = FileConsumer;
134
135    fn backend(&self) -> Backend {
136        match self.backend {
137            #[cfg(feature = "backend-kafka")]
138            SeaConsumerBackend::Kafka(_) => Backend::Kafka,
139            #[cfg(feature = "backend-redis")]
140            SeaConsumerBackend::Redis(_) => Backend::Redis,
141            #[cfg(feature = "backend-stdio")]
142            SeaConsumerBackend::Stdio(_) => Backend::Stdio,
143            #[cfg(feature = "backend-file")]
144            SeaConsumerBackend::File(_) => Backend::File,
145        }
146    }
147
148    #[cfg(feature = "backend-kafka")]
149    fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
150        match &mut self.backend {
151            SeaConsumerBackend::Kafka(s) => Some(s),
152            #[cfg(feature = "backend-redis")]
153            SeaConsumerBackend::Redis(_) => None,
154            #[cfg(feature = "backend-stdio")]
155            SeaConsumerBackend::Stdio(_) => None,
156            #[cfg(feature = "backend-file")]
157            SeaConsumerBackend::File(_) => None,
158        }
159    }
160
161    #[cfg(feature = "backend-redis")]
162    fn get_redis(&mut self) -> Option<&mut Self::Redis> {
163        match &mut self.backend {
164            #[cfg(feature = "backend-kafka")]
165            SeaConsumerBackend::Kafka(_) => None,
166            SeaConsumerBackend::Redis(s) => Some(s),
167            #[cfg(feature = "backend-stdio")]
168            SeaConsumerBackend::Stdio(_) => None,
169            #[cfg(feature = "backend-file")]
170            SeaConsumerBackend::File(_) => None,
171        }
172    }
173
174    #[cfg(feature = "backend-stdio")]
175    fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
176        match &mut self.backend {
177            #[cfg(feature = "backend-kafka")]
178            SeaConsumerBackend::Kafka(_) => None,
179            #[cfg(feature = "backend-redis")]
180            SeaConsumerBackend::Redis(_) => None,
181            SeaConsumerBackend::Stdio(s) => Some(s),
182            #[cfg(feature = "backend-file")]
183            SeaConsumerBackend::File(_) => None,
184        }
185    }
186
187    #[cfg(feature = "backend-file")]
188    fn get_file(&mut self) -> Option<&mut Self::File> {
189        match &mut self.backend {
190            #[cfg(feature = "backend-kafka")]
191            SeaConsumerBackend::Kafka(_) => None,
192            #[cfg(feature = "backend-redis")]
193            SeaConsumerBackend::Redis(_) => None,
194            #[cfg(feature = "backend-stdio")]
195            SeaConsumerBackend::Stdio(_) => None,
196            SeaConsumerBackend::File(s) => Some(s),
197        }
198    }
199}
200
201impl Consumer for SeaConsumer {
202    type Error = BackendErr;
203    type Message<'a> = SeaMessage<'a>;
204    type NextFuture<'a> = NextFuture<'a>;
205    type Stream<'a> = SeaMessageStream<'a>;
206
207    async fn seek(&mut self, to: Timestamp) -> SeaResult<()> {
208        match &mut self.backend {
209            #[cfg(feature = "backend-kafka")]
210            SeaConsumerBackend::Kafka(i) => i.seek(to).await.map_err(map_err),
211            #[cfg(feature = "backend-redis")]
212            SeaConsumerBackend::Redis(i) => i.seek(to).await.map_err(map_err),
213            #[cfg(feature = "backend-stdio")]
214            SeaConsumerBackend::Stdio(i) => i.seek(to).await.map_err(map_err),
215            #[cfg(feature = "backend-file")]
216            SeaConsumerBackend::File(i) => i.seek(to).await.map_err(map_err),
217        }
218    }
219
220    async fn rewind(&mut self, pos: SeqPos) -> SeaResult<()> {
221        match &mut self.backend {
222            #[cfg(feature = "backend-kafka")]
223            SeaConsumerBackend::Kafka(i) => i.rewind(pos).await.map_err(map_err),
224            #[cfg(feature = "backend-redis")]
225            SeaConsumerBackend::Redis(i) => i.rewind(pos).await.map_err(map_err),
226            #[cfg(feature = "backend-stdio")]
227            SeaConsumerBackend::Stdio(i) => i.rewind(pos).await.map_err(map_err),
228            #[cfg(feature = "backend-file")]
229            SeaConsumerBackend::File(i) => i.rewind(pos).await.map_err(map_err),
230        }
231    }
232
233    fn assign(&mut self, ss: (StreamKey, ShardId)) -> SeaResult<()> {
234        match &mut self.backend {
235            #[cfg(feature = "backend-kafka")]
236            SeaConsumerBackend::Kafka(i) => i.assign(ss).map_err(map_err),
237            #[cfg(feature = "backend-redis")]
238            SeaConsumerBackend::Redis(i) => i.assign(ss).map_err(map_err),
239            #[cfg(feature = "backend-stdio")]
240            SeaConsumerBackend::Stdio(i) => i.assign(ss).map_err(map_err),
241            #[cfg(feature = "backend-file")]
242            SeaConsumerBackend::File(i) => i.assign(ss).map_err(map_err),
243        }
244    }
245
246    fn unassign(&mut self, ss: (StreamKey, ShardId)) -> SeaResult<()> {
247        match &mut self.backend {
248            #[cfg(feature = "backend-kafka")]
249            SeaConsumerBackend::Kafka(i) => i.unassign(ss).map_err(map_err),
250            #[cfg(feature = "backend-redis")]
251            SeaConsumerBackend::Redis(i) => i.unassign(ss).map_err(map_err),
252            #[cfg(feature = "backend-stdio")]
253            SeaConsumerBackend::Stdio(i) => i.unassign(ss).map_err(map_err),
254            #[cfg(feature = "backend-file")]
255            SeaConsumerBackend::File(i) => i.unassign(ss).map_err(map_err),
256        }
257    }
258
259    fn next(&self) -> Self::NextFuture<'_> {
260        match &self.backend {
261            #[cfg(feature = "backend-kafka")]
262            SeaConsumerBackend::Kafka(i) => NextFuture::Kafka(i.next()),
263            #[cfg(feature = "backend-redis")]
264            SeaConsumerBackend::Redis(i) => NextFuture::Redis(i.next()),
265            #[cfg(feature = "backend-stdio")]
266            SeaConsumerBackend::Stdio(i) => NextFuture::Stdio(i.next()),
267            #[cfg(feature = "backend-file")]
268            SeaConsumerBackend::File(i) => NextFuture::File(i.next()),
269        }
270    }
271
272    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
273        match &mut self.backend {
274            #[cfg(feature = "backend-kafka")]
275            SeaConsumerBackend::Kafka(i) => SeaMessageStream::Kafka(i.stream()),
276            #[cfg(feature = "backend-redis")]
277            SeaConsumerBackend::Redis(i) => SeaMessageStream::Redis(i.stream()),
278            #[cfg(feature = "backend-stdio")]
279            SeaConsumerBackend::Stdio(i) => SeaMessageStream::Stdio(i.stream()),
280            #[cfg(feature = "backend-file")]
281            SeaConsumerBackend::File(i) => SeaMessageStream::File(i.stream()),
282        }
283    }
284}
285
286impl<'a> Future for NextFuture<'a> {
287    type Output = StreamResult<SeaMessage<'a>, BackendErr>;
288
289    fn poll(
290        self: Pin<&mut Self>,
291        cx: &mut std::task::Context<'_>,
292    ) -> std::task::Poll<Self::Output> {
293        match Pin::into_inner(self) {
294            #[cfg(feature = "backend-kafka")]
295            Self::Kafka(fut) => match Pin::new(fut).poll_unpin(cx) {
296                Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Kafka).map_err(map_err)),
297                Poll::Pending => Poll::Pending,
298            },
299            #[cfg(feature = "backend-redis")]
300            Self::Redis(fut) => match Pin::new(fut).poll_unpin(cx) {
301                Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Redis).map_err(map_err)),
302                Poll::Pending => Poll::Pending,
303            },
304            #[cfg(feature = "backend-stdio")]
305            Self::Stdio(fut) => match Pin::new(fut).poll_unpin(cx) {
306                Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::Stdio).map_err(map_err)),
307                Poll::Pending => Poll::Pending,
308            },
309            #[cfg(feature = "backend-file")]
310            Self::File(fut) => match Pin::new(fut).poll_unpin(cx) {
311                Poll::Ready(res) => Poll::Ready(res.map(SeaMessage::File).map_err(map_err)),
312                Poll::Pending => Poll::Pending,
313            },
314        }
315    }
316}
317
318impl<'a> Stream for SeaMessageStream<'a> {
319    type Item = StreamResult<SeaMessage<'a>, BackendErr>;
320
321    fn poll_next(
322        self: Pin<&mut Self>,
323        cx: &mut std::task::Context<'_>,
324    ) -> Poll<Option<Self::Item>> {
325        match Pin::into_inner(self) {
326            #[cfg(feature = "backend-kafka")]
327            Self::Kafka(ss) => match Pin::new(ss).poll_next(cx) {
328                Poll::Ready(Some(res)) => {
329                    Poll::Ready(Some(res.map(SeaMessage::Kafka).map_err(map_err)))
330                }
331                Poll::Ready(None) => Poll::Ready(None),
332                Poll::Pending => Poll::Pending,
333            },
334            #[cfg(feature = "backend-redis")]
335            Self::Redis(ss) => match Pin::new(ss).poll_next(cx) {
336                Poll::Ready(Some(res)) => {
337                    Poll::Ready(Some(res.map(SeaMessage::Redis).map_err(map_err)))
338                }
339                Poll::Ready(None) => Poll::Ready(None),
340                Poll::Pending => Poll::Pending,
341            },
342            #[cfg(feature = "backend-stdio")]
343            Self::Stdio(ss) => match Pin::new(ss).poll_next(cx) {
344                Poll::Ready(Some(res)) => {
345                    Poll::Ready(Some(res.map(SeaMessage::Stdio).map_err(map_err)))
346                }
347                Poll::Ready(None) => Poll::Ready(None),
348                Poll::Pending => Poll::Pending,
349            },
350            #[cfg(feature = "backend-file")]
351            Self::File(ss) => match Pin::new(ss).poll_next(cx) {
352                Poll::Ready(Some(res)) => {
353                    Poll::Ready(Some(res.map(SeaMessage::File).map_err(map_err)))
354                }
355                Poll::Ready(None) => Poll::Ready(None),
356                Poll::Pending => Poll::Pending,
357            },
358        }
359    }
360}