sea_streamer_socket/
producer.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileProducer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaProducer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisProducer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioProducer;
9
10use crate::{map_err, Backend, BackendErr, SeaResult, SeaStreamerBackend};
11use sea_streamer_types::{
12    export::futures::FutureExt, Buffer, Producer, Receipt, StreamKey, StreamResult,
13};
14use std::{future::Future, pin::Pin, task::Poll};
15
16#[derive(Debug, Clone)]
17/// `sea-streamer-socket` concrete type of Producer.
18pub struct SeaProducer {
19    pub(crate) backend: SeaProducerBackend,
20}
21
22#[derive(Debug, Clone)]
23pub(crate) enum SeaProducerBackend {
24    #[cfg(feature = "backend-kafka")]
25    Kafka(KafkaProducer),
26    #[cfg(feature = "backend-redis")]
27    Redis(RedisProducer),
28    #[cfg(feature = "backend-stdio")]
29    Stdio(StdioProducer),
30    #[cfg(feature = "backend-file")]
31    File(FileProducer),
32}
33
34#[cfg(feature = "backend-kafka")]
35impl From<KafkaProducer> for SeaProducer {
36    fn from(i: KafkaProducer) -> Self {
37        Self {
38            backend: SeaProducerBackend::Kafka(i),
39        }
40    }
41}
42
43#[cfg(feature = "backend-redis")]
44impl From<RedisProducer> for SeaProducer {
45    fn from(i: RedisProducer) -> Self {
46        Self {
47            backend: SeaProducerBackend::Redis(i),
48        }
49    }
50}
51
52#[cfg(feature = "backend-stdio")]
53impl From<StdioProducer> for SeaProducer {
54    fn from(i: StdioProducer) -> Self {
55        Self {
56            backend: SeaProducerBackend::Stdio(i),
57        }
58    }
59}
60
61#[cfg(feature = "backend-file")]
62impl From<FileProducer> for SeaProducer {
63    fn from(i: FileProducer) -> Self {
64        Self {
65            backend: SeaProducerBackend::File(i),
66        }
67    }
68}
69
70impl SeaStreamerBackend for SeaProducer {
71    #[cfg(feature = "backend-kafka")]
72    type Kafka = KafkaProducer;
73    #[cfg(feature = "backend-redis")]
74    type Redis = RedisProducer;
75    #[cfg(feature = "backend-stdio")]
76    type Stdio = StdioProducer;
77    #[cfg(feature = "backend-file")]
78    type File = FileProducer;
79
80    fn backend(&self) -> Backend {
81        match self.backend {
82            #[cfg(feature = "backend-kafka")]
83            SeaProducerBackend::Kafka(_) => Backend::Kafka,
84            #[cfg(feature = "backend-redis")]
85            SeaProducerBackend::Redis(_) => Backend::Redis,
86            #[cfg(feature = "backend-stdio")]
87            SeaProducerBackend::Stdio(_) => Backend::Stdio,
88            #[cfg(feature = "backend-file")]
89            SeaProducerBackend::File(_) => Backend::File,
90        }
91    }
92
93    #[cfg(feature = "backend-kafka")]
94    fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
95        match &mut self.backend {
96            SeaProducerBackend::Kafka(s) => Some(s),
97            #[cfg(feature = "backend-redis")]
98            SeaProducerBackend::Redis(_) => None,
99            #[cfg(feature = "backend-stdio")]
100            SeaProducerBackend::Stdio(_) => None,
101            #[cfg(feature = "backend-file")]
102            SeaProducerBackend::File(_) => None,
103        }
104    }
105
106    #[cfg(feature = "backend-redis")]
107    fn get_redis(&mut self) -> Option<&mut Self::Redis> {
108        match &mut self.backend {
109            #[cfg(feature = "backend-kafka")]
110            SeaProducerBackend::Kafka(_) => None,
111            SeaProducerBackend::Redis(s) => Some(s),
112            #[cfg(feature = "backend-stdio")]
113            SeaProducerBackend::Stdio(_) => None,
114            #[cfg(feature = "backend-file")]
115            SeaProducerBackend::File(_) => None,
116        }
117    }
118
119    #[cfg(feature = "backend-stdio")]
120    fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
121        match &mut self.backend {
122            #[cfg(feature = "backend-kafka")]
123            SeaProducerBackend::Kafka(_) => None,
124            #[cfg(feature = "backend-redis")]
125            SeaProducerBackend::Redis(_) => None,
126            SeaProducerBackend::Stdio(s) => Some(s),
127            #[cfg(feature = "backend-file")]
128            SeaProducerBackend::File(_) => None,
129        }
130    }
131
132    #[cfg(feature = "backend-file")]
133    fn get_file(&mut self) -> Option<&mut Self::File> {
134        match &mut self.backend {
135            #[cfg(feature = "backend-kafka")]
136            SeaProducerBackend::Kafka(_) => None,
137            #[cfg(feature = "backend-redis")]
138            SeaProducerBackend::Redis(_) => None,
139            #[cfg(feature = "backend-stdio")]
140            SeaProducerBackend::Stdio(_) => None,
141            SeaProducerBackend::File(s) => Some(s),
142        }
143    }
144}
145
146#[derive(Debug)]
147/// `sea-streamer-socket` concrete type of a Future that will yield a send receipt.
148pub enum SendFuture {
149    #[cfg(feature = "backend-kafka")]
150    Kafka(sea_streamer_kafka::SendFuture),
151    #[cfg(feature = "backend-redis")]
152    Redis(sea_streamer_redis::SendFuture),
153    #[cfg(feature = "backend-stdio")]
154    Stdio(sea_streamer_stdio::SendFuture),
155    #[cfg(feature = "backend-file")]
156    File(sea_streamer_file::SendFuture),
157}
158
159impl Producer for SeaProducer {
160    type Error = BackendErr;
161
162    type SendFuture = SendFuture;
163
164    fn send_to<S: Buffer>(&self, stream: &StreamKey, payload: S) -> SeaResult<Self::SendFuture> {
165        Ok(match &self.backend {
166            #[cfg(feature = "backend-kafka")]
167            SeaProducerBackend::Kafka(i) => {
168                SendFuture::Kafka(i.send_to(stream, payload).map_err(map_err)?)
169            }
170            #[cfg(feature = "backend-redis")]
171            SeaProducerBackend::Redis(i) => {
172                SendFuture::Redis(i.send_to(stream, payload).map_err(map_err)?)
173            }
174            #[cfg(feature = "backend-stdio")]
175            SeaProducerBackend::Stdio(i) => {
176                SendFuture::Stdio(i.send_to(stream, payload).map_err(map_err)?)
177            }
178            #[cfg(feature = "backend-file")]
179            SeaProducerBackend::File(i) => {
180                SendFuture::File(i.send_to(stream, payload).map_err(map_err)?)
181            }
182        })
183    }
184
185    async fn end(self) -> SeaResult<()> {
186        match self.backend {
187            #[cfg(feature = "backend-kafka")]
188            SeaProducerBackend::Kafka(i) => i.end().await.map_err(map_err),
189            #[cfg(feature = "backend-redis")]
190            SeaProducerBackend::Redis(i) => i.end().await.map_err(map_err),
191            #[cfg(feature = "backend-stdio")]
192            SeaProducerBackend::Stdio(i) => i.end().await.map_err(map_err),
193            #[cfg(feature = "backend-file")]
194            SeaProducerBackend::File(i) => i.end().await.map_err(map_err),
195        }
196    }
197
198    async fn flush(&mut self) -> SeaResult<()> {
199        match &mut self.backend {
200            #[cfg(feature = "backend-kafka")]
201            SeaProducerBackend::Kafka(i) => i.flush().await.map_err(map_err),
202            #[cfg(feature = "backend-redis")]
203            SeaProducerBackend::Redis(i) => i.flush().await.map_err(map_err),
204            #[cfg(feature = "backend-stdio")]
205            SeaProducerBackend::Stdio(i) => i.flush().await.map_err(map_err),
206            #[cfg(feature = "backend-file")]
207            SeaProducerBackend::File(i) => i.flush().await.map_err(map_err),
208        }
209    }
210
211    fn anchor(&mut self, stream: StreamKey) -> SeaResult<()> {
212        match &mut self.backend {
213            #[cfg(feature = "backend-kafka")]
214            SeaProducerBackend::Kafka(i) => i.anchor(stream).map_err(map_err),
215            #[cfg(feature = "backend-redis")]
216            SeaProducerBackend::Redis(i) => i.anchor(stream).map_err(map_err),
217            #[cfg(feature = "backend-stdio")]
218            SeaProducerBackend::Stdio(i) => i.anchor(stream).map_err(map_err),
219            #[cfg(feature = "backend-file")]
220            SeaProducerBackend::File(i) => i.anchor(stream).map_err(map_err),
221        }
222    }
223
224    fn anchored(&self) -> SeaResult<&StreamKey> {
225        match &self.backend {
226            #[cfg(feature = "backend-kafka")]
227            SeaProducerBackend::Kafka(i) => i.anchored().map_err(map_err),
228            #[cfg(feature = "backend-redis")]
229            SeaProducerBackend::Redis(i) => i.anchored().map_err(map_err),
230            #[cfg(feature = "backend-stdio")]
231            SeaProducerBackend::Stdio(i) => i.anchored().map_err(map_err),
232            #[cfg(feature = "backend-file")]
233            SeaProducerBackend::File(i) => i.anchored().map_err(map_err),
234        }
235    }
236}
237
238impl Future for SendFuture {
239    type Output = StreamResult<Receipt, BackendErr>;
240
241    fn poll(
242        self: Pin<&mut Self>,
243        cx: &mut std::task::Context<'_>,
244    ) -> std::task::Poll<Self::Output> {
245        match Pin::into_inner(self) {
246            #[cfg(feature = "backend-kafka")]
247            Self::Kafka(fut) => match Pin::new(fut).poll_unpin(cx) {
248                Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
249                Poll::Pending => Poll::Pending,
250            },
251            #[cfg(feature = "backend-redis")]
252            Self::Redis(fut) => match Pin::new(fut).poll_unpin(cx) {
253                Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
254                Poll::Pending => Poll::Pending,
255            },
256            #[cfg(feature = "backend-stdio")]
257            Self::Stdio(fut) => match Pin::new(fut).poll_unpin(cx) {
258                Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
259                Poll::Pending => Poll::Pending,
260            },
261            #[cfg(feature = "backend-file")]
262            Self::File(fut) => match Pin::new(fut).poll_unpin(cx) {
263                Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
264                Poll::Pending => Poll::Pending,
265            },
266        }
267    }
268}