1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileStreamer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaStreamer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisStreamer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioStreamer;
9
10use sea_streamer_types::{StreamErr, StreamKey, Streamer, StreamerUri};
11
12use crate::{
13 map_err, Backend, BackendErr, SeaConnectOptions, SeaConsumer, SeaConsumerBackend,
14 SeaConsumerOptions, SeaProducer, SeaProducerBackend, SeaProducerOptions, SeaResult,
15 SeaStreamerBackend,
16};
17
18#[derive(Debug)]
19pub struct SeaStreamer {
21 backend: SeaStreamerInner,
22}
23
24#[derive(Debug, Clone)]
25pub(crate) enum SeaStreamerInner {
26 #[cfg(feature = "backend-kafka")]
27 Kafka(KafkaStreamer),
28 #[cfg(feature = "backend-redis")]
29 Redis(RedisStreamer),
30 #[cfg(feature = "backend-stdio")]
31 Stdio(StdioStreamer),
32 #[cfg(feature = "backend-file")]
33 File(FileStreamer),
34}
35
36#[cfg(feature = "backend-kafka")]
37impl From<KafkaStreamer> for SeaStreamer {
38 fn from(i: KafkaStreamer) -> Self {
39 Self {
40 backend: SeaStreamerInner::Kafka(i),
41 }
42 }
43}
44
45#[cfg(feature = "backend-redis")]
46impl From<RedisStreamer> for SeaStreamer {
47 fn from(i: RedisStreamer) -> Self {
48 Self {
49 backend: SeaStreamerInner::Redis(i),
50 }
51 }
52}
53
54#[cfg(feature = "backend-stdio")]
55impl From<StdioStreamer> for SeaStreamer {
56 fn from(i: StdioStreamer) -> Self {
57 Self {
58 backend: SeaStreamerInner::Stdio(i),
59 }
60 }
61}
62
63#[cfg(feature = "backend-file")]
64impl From<FileStreamer> for SeaStreamer {
65 fn from(i: FileStreamer) -> Self {
66 Self {
67 backend: SeaStreamerInner::File(i),
68 }
69 }
70}
71
72impl SeaStreamerBackend for SeaStreamer {
73 #[cfg(feature = "backend-kafka")]
74 type Kafka = KafkaStreamer;
75 #[cfg(feature = "backend-redis")]
76 type Redis = RedisStreamer;
77 #[cfg(feature = "backend-stdio")]
78 type Stdio = StdioStreamer;
79 #[cfg(feature = "backend-file")]
80 type File = FileStreamer;
81
82 fn backend(&self) -> Backend {
83 match self.backend {
84 #[cfg(feature = "backend-kafka")]
85 SeaStreamerInner::Kafka(_) => Backend::Kafka,
86 #[cfg(feature = "backend-redis")]
87 SeaStreamerInner::Redis(_) => Backend::Redis,
88 #[cfg(feature = "backend-stdio")]
89 SeaStreamerInner::Stdio(_) => Backend::Stdio,
90 #[cfg(feature = "backend-file")]
91 SeaStreamerInner::File(_) => Backend::File,
92 }
93 }
94
95 #[cfg(feature = "backend-kafka")]
96 fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
97 match &mut self.backend {
98 SeaStreamerInner::Kafka(s) => Some(s),
99 #[cfg(feature = "backend-redis")]
100 SeaStreamerInner::Redis(_) => None,
101 #[cfg(feature = "backend-stdio")]
102 SeaStreamerInner::Stdio(_) => None,
103 #[cfg(feature = "backend-file")]
104 SeaStreamerInner::File(_) => None,
105 }
106 }
107
108 #[cfg(feature = "backend-redis")]
109 fn get_redis(&mut self) -> Option<&mut Self::Redis> {
110 match &mut self.backend {
111 #[cfg(feature = "backend-kafka")]
112 SeaStreamerInner::Kafka(_) => None,
113 SeaStreamerInner::Redis(s) => Some(s),
114 #[cfg(feature = "backend-stdio")]
115 SeaStreamerInner::Stdio(_) => None,
116 #[cfg(feature = "backend-file")]
117 SeaStreamerInner::File(_) => None,
118 }
119 }
120
121 #[cfg(feature = "backend-stdio")]
122 fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
123 match &mut self.backend {
124 #[cfg(feature = "backend-kafka")]
125 SeaStreamerInner::Kafka(_) => None,
126 #[cfg(feature = "backend-redis")]
127 SeaStreamerInner::Redis(_) => None,
128 SeaStreamerInner::Stdio(s) => Some(s),
129 #[cfg(feature = "backend-file")]
130 SeaStreamerInner::File(_) => None,
131 }
132 }
133
134 #[cfg(feature = "backend-file")]
135 fn get_file(&mut self) -> Option<&mut Self::File> {
136 match &mut self.backend {
137 #[cfg(feature = "backend-kafka")]
138 SeaStreamerInner::Kafka(_) => None,
139 #[cfg(feature = "backend-redis")]
140 SeaStreamerInner::Redis(_) => None,
141 #[cfg(feature = "backend-stdio")]
142 SeaStreamerInner::Stdio(_) => None,
143 SeaStreamerInner::File(s) => Some(s),
144 }
145 }
146}
147
148impl Streamer for SeaStreamer {
149 type Error = BackendErr;
150 type Producer = SeaProducer;
151 type Consumer = SeaConsumer;
152 type ConnectOptions = SeaConnectOptions;
153 type ConsumerOptions = SeaConsumerOptions;
154 type ProducerOptions = SeaProducerOptions;
155
156 async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> SeaResult<Self> {
157 let backend = match uri.protocol() {
158 Some(protocol) => match protocol {
159 #[cfg(feature = "backend-kafka")]
160 "kafka" => SeaStreamerInner::Kafka(
161 KafkaStreamer::connect(uri, options.into_kafka_connect_options())
162 .await
163 .map_err(map_err)?,
164 ),
165 #[cfg(feature = "backend-redis")]
166 "redis" | "rediss" => SeaStreamerInner::Redis(
167 RedisStreamer::connect(uri, options.into_redis_connect_options())
168 .await
169 .map_err(map_err)?,
170 ),
171 #[cfg(feature = "backend-stdio")]
172 "stdio" => SeaStreamerInner::Stdio(
173 StdioStreamer::connect(uri, options.into_stdio_connect_options())
174 .await
175 .map_err(map_err)?,
176 ),
177 #[cfg(feature = "backend-file")]
178 "file" => SeaStreamerInner::File(
179 FileStreamer::connect(uri, options.into_file_connect_options())
180 .await
181 .map_err(map_err)?,
182 ),
183 _ => {
184 return Err(StreamErr::Connect(format!("unknown protocol `{protocol}`")));
185 }
186 },
187 None => {
188 return Err(StreamErr::Connect("protocol not set".to_owned()));
189 }
190 };
191 Ok(SeaStreamer { backend })
192 }
193
194 async fn disconnect(self) -> SeaResult<()> {
195 match self.backend {
196 #[cfg(feature = "backend-kafka")]
197 SeaStreamerInner::Kafka(i) => i.disconnect().await.map_err(map_err),
198 #[cfg(feature = "backend-redis")]
199 SeaStreamerInner::Redis(i) => i.disconnect().await.map_err(map_err),
200 #[cfg(feature = "backend-stdio")]
201 SeaStreamerInner::Stdio(i) => i.disconnect().await.map_err(map_err),
202 #[cfg(feature = "backend-file")]
203 SeaStreamerInner::File(i) => i.disconnect().await.map_err(map_err),
204 }
205 }
206
207 async fn create_generic_producer(
208 &self,
209 options: Self::ProducerOptions,
210 ) -> SeaResult<Self::Producer> {
211 let backend = match &self.backend {
212 #[cfg(feature = "backend-kafka")]
213 SeaStreamerInner::Kafka(i) => SeaProducerBackend::Kafka(
214 i.create_generic_producer(options.into_kafka_producer_options())
215 .await
216 .map_err(map_err)?,
217 ),
218 #[cfg(feature = "backend-redis")]
219 SeaStreamerInner::Redis(i) => SeaProducerBackend::Redis(
220 i.create_generic_producer(options.into_redis_producer_options())
221 .await
222 .map_err(map_err)?,
223 ),
224 #[cfg(feature = "backend-stdio")]
225 SeaStreamerInner::Stdio(i) => SeaProducerBackend::Stdio(
226 i.create_generic_producer(options.into_stdio_producer_options())
227 .await
228 .map_err(map_err)?,
229 ),
230 #[cfg(feature = "backend-file")]
231 SeaStreamerInner::File(i) => SeaProducerBackend::File(
232 i.create_generic_producer(options.into_file_producer_options())
233 .await
234 .map_err(map_err)?,
235 ),
236 };
237 Ok(SeaProducer { backend })
238 }
239
240 async fn create_consumer(
241 &self,
242 streams: &[StreamKey],
243 options: Self::ConsumerOptions,
244 ) -> SeaResult<Self::Consumer> {
245 let backend = match &self.backend {
246 #[cfg(feature = "backend-kafka")]
247 SeaStreamerInner::Kafka(i) => SeaConsumerBackend::Kafka(
248 i.create_consumer(streams, options.into_kafka_consumer_options())
249 .await
250 .map_err(map_err)?,
251 ),
252 #[cfg(feature = "backend-redis")]
253 SeaStreamerInner::Redis(i) => SeaConsumerBackend::Redis(
254 i.create_consumer(streams, options.into_redis_consumer_options())
255 .await
256 .map_err(map_err)?,
257 ),
258 #[cfg(feature = "backend-stdio")]
259 SeaStreamerInner::Stdio(i) => SeaConsumerBackend::Stdio(
260 i.create_consumer(streams, options.into_stdio_consumer_options())
261 .await
262 .map_err(map_err)?,
263 ),
264 #[cfg(feature = "backend-file")]
265 SeaStreamerInner::File(i) => SeaConsumerBackend::File(
266 i.create_consumer(streams, options.into_file_consumer_options())
267 .await
268 .map_err(map_err)?,
269 ),
270 };
271 Ok(SeaConsumer { backend })
272 }
273}