sea_streamer_socket/
streamer.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileStreamer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaStreamer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisStreamer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioStreamer;
9
10use sea_streamer_types::{StreamErr, StreamKey, Streamer, StreamerUri};
11
12use crate::{
13    map_err, Backend, BackendErr, SeaConnectOptions, SeaConsumer, SeaConsumerBackend,
14    SeaConsumerOptions, SeaProducer, SeaProducerBackend, SeaProducerOptions, SeaResult,
15    SeaStreamerBackend,
16};
17
18#[derive(Debug)]
19/// `sea-streamer-socket` concrete type of Streamer.
20pub struct SeaStreamer {
21    backend: SeaStreamerInner,
22}
23
24#[derive(Debug, Clone)]
25pub(crate) enum SeaStreamerInner {
26    #[cfg(feature = "backend-kafka")]
27    Kafka(KafkaStreamer),
28    #[cfg(feature = "backend-redis")]
29    Redis(RedisStreamer),
30    #[cfg(feature = "backend-stdio")]
31    Stdio(StdioStreamer),
32    #[cfg(feature = "backend-file")]
33    File(FileStreamer),
34}
35
36#[cfg(feature = "backend-kafka")]
37impl From<KafkaStreamer> for SeaStreamer {
38    fn from(i: KafkaStreamer) -> Self {
39        Self {
40            backend: SeaStreamerInner::Kafka(i),
41        }
42    }
43}
44
45#[cfg(feature = "backend-redis")]
46impl From<RedisStreamer> for SeaStreamer {
47    fn from(i: RedisStreamer) -> Self {
48        Self {
49            backend: SeaStreamerInner::Redis(i),
50        }
51    }
52}
53
54#[cfg(feature = "backend-stdio")]
55impl From<StdioStreamer> for SeaStreamer {
56    fn from(i: StdioStreamer) -> Self {
57        Self {
58            backend: SeaStreamerInner::Stdio(i),
59        }
60    }
61}
62
63#[cfg(feature = "backend-file")]
64impl From<FileStreamer> for SeaStreamer {
65    fn from(i: FileStreamer) -> Self {
66        Self {
67            backend: SeaStreamerInner::File(i),
68        }
69    }
70}
71
72impl SeaStreamerBackend for SeaStreamer {
73    #[cfg(feature = "backend-kafka")]
74    type Kafka = KafkaStreamer;
75    #[cfg(feature = "backend-redis")]
76    type Redis = RedisStreamer;
77    #[cfg(feature = "backend-stdio")]
78    type Stdio = StdioStreamer;
79    #[cfg(feature = "backend-file")]
80    type File = FileStreamer;
81
82    fn backend(&self) -> Backend {
83        match self.backend {
84            #[cfg(feature = "backend-kafka")]
85            SeaStreamerInner::Kafka(_) => Backend::Kafka,
86            #[cfg(feature = "backend-redis")]
87            SeaStreamerInner::Redis(_) => Backend::Redis,
88            #[cfg(feature = "backend-stdio")]
89            SeaStreamerInner::Stdio(_) => Backend::Stdio,
90            #[cfg(feature = "backend-file")]
91            SeaStreamerInner::File(_) => Backend::File,
92        }
93    }
94
95    #[cfg(feature = "backend-kafka")]
96    fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
97        match &mut self.backend {
98            SeaStreamerInner::Kafka(s) => Some(s),
99            #[cfg(feature = "backend-redis")]
100            SeaStreamerInner::Redis(_) => None,
101            #[cfg(feature = "backend-stdio")]
102            SeaStreamerInner::Stdio(_) => None,
103            #[cfg(feature = "backend-file")]
104            SeaStreamerInner::File(_) => None,
105        }
106    }
107
108    #[cfg(feature = "backend-redis")]
109    fn get_redis(&mut self) -> Option<&mut Self::Redis> {
110        match &mut self.backend {
111            #[cfg(feature = "backend-kafka")]
112            SeaStreamerInner::Kafka(_) => None,
113            SeaStreamerInner::Redis(s) => Some(s),
114            #[cfg(feature = "backend-stdio")]
115            SeaStreamerInner::Stdio(_) => None,
116            #[cfg(feature = "backend-file")]
117            SeaStreamerInner::File(_) => None,
118        }
119    }
120
121    #[cfg(feature = "backend-stdio")]
122    fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
123        match &mut self.backend {
124            #[cfg(feature = "backend-kafka")]
125            SeaStreamerInner::Kafka(_) => None,
126            #[cfg(feature = "backend-redis")]
127            SeaStreamerInner::Redis(_) => None,
128            SeaStreamerInner::Stdio(s) => Some(s),
129            #[cfg(feature = "backend-file")]
130            SeaStreamerInner::File(_) => None,
131        }
132    }
133
134    #[cfg(feature = "backend-file")]
135    fn get_file(&mut self) -> Option<&mut Self::File> {
136        match &mut self.backend {
137            #[cfg(feature = "backend-kafka")]
138            SeaStreamerInner::Kafka(_) => None,
139            #[cfg(feature = "backend-redis")]
140            SeaStreamerInner::Redis(_) => None,
141            #[cfg(feature = "backend-stdio")]
142            SeaStreamerInner::Stdio(_) => None,
143            SeaStreamerInner::File(s) => Some(s),
144        }
145    }
146}
147
148impl Streamer for SeaStreamer {
149    type Error = BackendErr;
150    type Producer = SeaProducer;
151    type Consumer = SeaConsumer;
152    type ConnectOptions = SeaConnectOptions;
153    type ConsumerOptions = SeaConsumerOptions;
154    type ProducerOptions = SeaProducerOptions;
155
156    async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> SeaResult<Self> {
157        let backend = match uri.protocol() {
158            Some(protocol) => match protocol {
159                #[cfg(feature = "backend-kafka")]
160                "kafka" => SeaStreamerInner::Kafka(
161                    KafkaStreamer::connect(uri, options.into_kafka_connect_options())
162                        .await
163                        .map_err(map_err)?,
164                ),
165                #[cfg(feature = "backend-redis")]
166                "redis" | "rediss" => SeaStreamerInner::Redis(
167                    RedisStreamer::connect(uri, options.into_redis_connect_options())
168                        .await
169                        .map_err(map_err)?,
170                ),
171                #[cfg(feature = "backend-stdio")]
172                "stdio" => SeaStreamerInner::Stdio(
173                    StdioStreamer::connect(uri, options.into_stdio_connect_options())
174                        .await
175                        .map_err(map_err)?,
176                ),
177                #[cfg(feature = "backend-file")]
178                "file" => SeaStreamerInner::File(
179                    FileStreamer::connect(uri, options.into_file_connect_options())
180                        .await
181                        .map_err(map_err)?,
182                ),
183                _ => {
184                    return Err(StreamErr::Connect(format!("unknown protocol `{protocol}`")));
185                }
186            },
187            None => {
188                return Err(StreamErr::Connect("protocol not set".to_owned()));
189            }
190        };
191        Ok(SeaStreamer { backend })
192    }
193
194    async fn disconnect(self) -> SeaResult<()> {
195        match self.backend {
196            #[cfg(feature = "backend-kafka")]
197            SeaStreamerInner::Kafka(i) => i.disconnect().await.map_err(map_err),
198            #[cfg(feature = "backend-redis")]
199            SeaStreamerInner::Redis(i) => i.disconnect().await.map_err(map_err),
200            #[cfg(feature = "backend-stdio")]
201            SeaStreamerInner::Stdio(i) => i.disconnect().await.map_err(map_err),
202            #[cfg(feature = "backend-file")]
203            SeaStreamerInner::File(i) => i.disconnect().await.map_err(map_err),
204        }
205    }
206
207    async fn create_generic_producer(
208        &self,
209        options: Self::ProducerOptions,
210    ) -> SeaResult<Self::Producer> {
211        let backend = match &self.backend {
212            #[cfg(feature = "backend-kafka")]
213            SeaStreamerInner::Kafka(i) => SeaProducerBackend::Kafka(
214                i.create_generic_producer(options.into_kafka_producer_options())
215                    .await
216                    .map_err(map_err)?,
217            ),
218            #[cfg(feature = "backend-redis")]
219            SeaStreamerInner::Redis(i) => SeaProducerBackend::Redis(
220                i.create_generic_producer(options.into_redis_producer_options())
221                    .await
222                    .map_err(map_err)?,
223            ),
224            #[cfg(feature = "backend-stdio")]
225            SeaStreamerInner::Stdio(i) => SeaProducerBackend::Stdio(
226                i.create_generic_producer(options.into_stdio_producer_options())
227                    .await
228                    .map_err(map_err)?,
229            ),
230            #[cfg(feature = "backend-file")]
231            SeaStreamerInner::File(i) => SeaProducerBackend::File(
232                i.create_generic_producer(options.into_file_producer_options())
233                    .await
234                    .map_err(map_err)?,
235            ),
236        };
237        Ok(SeaProducer { backend })
238    }
239
240    async fn create_consumer(
241        &self,
242        streams: &[StreamKey],
243        options: Self::ConsumerOptions,
244    ) -> SeaResult<Self::Consumer> {
245        let backend = match &self.backend {
246            #[cfg(feature = "backend-kafka")]
247            SeaStreamerInner::Kafka(i) => SeaConsumerBackend::Kafka(
248                i.create_consumer(streams, options.into_kafka_consumer_options())
249                    .await
250                    .map_err(map_err)?,
251            ),
252            #[cfg(feature = "backend-redis")]
253            SeaStreamerInner::Redis(i) => SeaConsumerBackend::Redis(
254                i.create_consumer(streams, options.into_redis_consumer_options())
255                    .await
256                    .map_err(map_err)?,
257            ),
258            #[cfg(feature = "backend-stdio")]
259            SeaStreamerInner::Stdio(i) => SeaConsumerBackend::Stdio(
260                i.create_consumer(streams, options.into_stdio_consumer_options())
261                    .await
262                    .map_err(map_err)?,
263            ),
264            #[cfg(feature = "backend-file")]
265            SeaStreamerInner::File(i) => SeaConsumerBackend::File(
266                i.create_consumer(streams, options.into_file_consumer_options())
267                    .await
268                    .map_err(map_err)?,
269            ),
270        };
271        Ok(SeaConsumer { backend })
272    }
273}