sea_streamer_socket/
producer.rs1#[cfg(feature = "backend-file")]
2use sea_streamer_file::FileProducer;
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::KafkaProducer;
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::RedisProducer;
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioProducer;
9
10use crate::{map_err, Backend, BackendErr, SeaResult, SeaStreamerBackend};
11use sea_streamer_types::{
12 export::futures::FutureExt, Buffer, Producer, Receipt, StreamKey, StreamResult,
13};
14use std::{future::Future, pin::Pin, task::Poll};
15
16#[derive(Debug, Clone)]
17pub struct SeaProducer {
19 pub(crate) backend: SeaProducerBackend,
20}
21
22#[derive(Debug, Clone)]
23pub(crate) enum SeaProducerBackend {
24 #[cfg(feature = "backend-kafka")]
25 Kafka(KafkaProducer),
26 #[cfg(feature = "backend-redis")]
27 Redis(RedisProducer),
28 #[cfg(feature = "backend-stdio")]
29 Stdio(StdioProducer),
30 #[cfg(feature = "backend-file")]
31 File(FileProducer),
32}
33
34#[cfg(feature = "backend-kafka")]
35impl From<KafkaProducer> for SeaProducer {
36 fn from(i: KafkaProducer) -> Self {
37 Self {
38 backend: SeaProducerBackend::Kafka(i),
39 }
40 }
41}
42
43#[cfg(feature = "backend-redis")]
44impl From<RedisProducer> for SeaProducer {
45 fn from(i: RedisProducer) -> Self {
46 Self {
47 backend: SeaProducerBackend::Redis(i),
48 }
49 }
50}
51
52#[cfg(feature = "backend-stdio")]
53impl From<StdioProducer> for SeaProducer {
54 fn from(i: StdioProducer) -> Self {
55 Self {
56 backend: SeaProducerBackend::Stdio(i),
57 }
58 }
59}
60
61#[cfg(feature = "backend-file")]
62impl From<FileProducer> for SeaProducer {
63 fn from(i: FileProducer) -> Self {
64 Self {
65 backend: SeaProducerBackend::File(i),
66 }
67 }
68}
69
70impl SeaStreamerBackend for SeaProducer {
71 #[cfg(feature = "backend-kafka")]
72 type Kafka = KafkaProducer;
73 #[cfg(feature = "backend-redis")]
74 type Redis = RedisProducer;
75 #[cfg(feature = "backend-stdio")]
76 type Stdio = StdioProducer;
77 #[cfg(feature = "backend-file")]
78 type File = FileProducer;
79
80 fn backend(&self) -> Backend {
81 match self.backend {
82 #[cfg(feature = "backend-kafka")]
83 SeaProducerBackend::Kafka(_) => Backend::Kafka,
84 #[cfg(feature = "backend-redis")]
85 SeaProducerBackend::Redis(_) => Backend::Redis,
86 #[cfg(feature = "backend-stdio")]
87 SeaProducerBackend::Stdio(_) => Backend::Stdio,
88 #[cfg(feature = "backend-file")]
89 SeaProducerBackend::File(_) => Backend::File,
90 }
91 }
92
93 #[cfg(feature = "backend-kafka")]
94 fn get_kafka(&mut self) -> Option<&mut Self::Kafka> {
95 match &mut self.backend {
96 SeaProducerBackend::Kafka(s) => Some(s),
97 #[cfg(feature = "backend-redis")]
98 SeaProducerBackend::Redis(_) => None,
99 #[cfg(feature = "backend-stdio")]
100 SeaProducerBackend::Stdio(_) => None,
101 #[cfg(feature = "backend-file")]
102 SeaProducerBackend::File(_) => None,
103 }
104 }
105
106 #[cfg(feature = "backend-redis")]
107 fn get_redis(&mut self) -> Option<&mut Self::Redis> {
108 match &mut self.backend {
109 #[cfg(feature = "backend-kafka")]
110 SeaProducerBackend::Kafka(_) => None,
111 SeaProducerBackend::Redis(s) => Some(s),
112 #[cfg(feature = "backend-stdio")]
113 SeaProducerBackend::Stdio(_) => None,
114 #[cfg(feature = "backend-file")]
115 SeaProducerBackend::File(_) => None,
116 }
117 }
118
119 #[cfg(feature = "backend-stdio")]
120 fn get_stdio(&mut self) -> Option<&mut Self::Stdio> {
121 match &mut self.backend {
122 #[cfg(feature = "backend-kafka")]
123 SeaProducerBackend::Kafka(_) => None,
124 #[cfg(feature = "backend-redis")]
125 SeaProducerBackend::Redis(_) => None,
126 SeaProducerBackend::Stdio(s) => Some(s),
127 #[cfg(feature = "backend-file")]
128 SeaProducerBackend::File(_) => None,
129 }
130 }
131
132 #[cfg(feature = "backend-file")]
133 fn get_file(&mut self) -> Option<&mut Self::File> {
134 match &mut self.backend {
135 #[cfg(feature = "backend-kafka")]
136 SeaProducerBackend::Kafka(_) => None,
137 #[cfg(feature = "backend-redis")]
138 SeaProducerBackend::Redis(_) => None,
139 #[cfg(feature = "backend-stdio")]
140 SeaProducerBackend::Stdio(_) => None,
141 SeaProducerBackend::File(s) => Some(s),
142 }
143 }
144}
145
146#[derive(Debug)]
147pub enum SendFuture {
149 #[cfg(feature = "backend-kafka")]
150 Kafka(sea_streamer_kafka::SendFuture),
151 #[cfg(feature = "backend-redis")]
152 Redis(sea_streamer_redis::SendFuture),
153 #[cfg(feature = "backend-stdio")]
154 Stdio(sea_streamer_stdio::SendFuture),
155 #[cfg(feature = "backend-file")]
156 File(sea_streamer_file::SendFuture),
157}
158
159impl Producer for SeaProducer {
160 type Error = BackendErr;
161
162 type SendFuture = SendFuture;
163
164 fn send_to<S: Buffer>(&self, stream: &StreamKey, payload: S) -> SeaResult<Self::SendFuture> {
165 Ok(match &self.backend {
166 #[cfg(feature = "backend-kafka")]
167 SeaProducerBackend::Kafka(i) => {
168 SendFuture::Kafka(i.send_to(stream, payload).map_err(map_err)?)
169 }
170 #[cfg(feature = "backend-redis")]
171 SeaProducerBackend::Redis(i) => {
172 SendFuture::Redis(i.send_to(stream, payload).map_err(map_err)?)
173 }
174 #[cfg(feature = "backend-stdio")]
175 SeaProducerBackend::Stdio(i) => {
176 SendFuture::Stdio(i.send_to(stream, payload).map_err(map_err)?)
177 }
178 #[cfg(feature = "backend-file")]
179 SeaProducerBackend::File(i) => {
180 SendFuture::File(i.send_to(stream, payload).map_err(map_err)?)
181 }
182 })
183 }
184
185 async fn end(self) -> SeaResult<()> {
186 match self.backend {
187 #[cfg(feature = "backend-kafka")]
188 SeaProducerBackend::Kafka(i) => i.end().await.map_err(map_err),
189 #[cfg(feature = "backend-redis")]
190 SeaProducerBackend::Redis(i) => i.end().await.map_err(map_err),
191 #[cfg(feature = "backend-stdio")]
192 SeaProducerBackend::Stdio(i) => i.end().await.map_err(map_err),
193 #[cfg(feature = "backend-file")]
194 SeaProducerBackend::File(i) => i.end().await.map_err(map_err),
195 }
196 }
197
198 async fn flush(&mut self) -> SeaResult<()> {
199 match &mut self.backend {
200 #[cfg(feature = "backend-kafka")]
201 SeaProducerBackend::Kafka(i) => i.flush().await.map_err(map_err),
202 #[cfg(feature = "backend-redis")]
203 SeaProducerBackend::Redis(i) => i.flush().await.map_err(map_err),
204 #[cfg(feature = "backend-stdio")]
205 SeaProducerBackend::Stdio(i) => i.flush().await.map_err(map_err),
206 #[cfg(feature = "backend-file")]
207 SeaProducerBackend::File(i) => i.flush().await.map_err(map_err),
208 }
209 }
210
211 fn anchor(&mut self, stream: StreamKey) -> SeaResult<()> {
212 match &mut self.backend {
213 #[cfg(feature = "backend-kafka")]
214 SeaProducerBackend::Kafka(i) => i.anchor(stream).map_err(map_err),
215 #[cfg(feature = "backend-redis")]
216 SeaProducerBackend::Redis(i) => i.anchor(stream).map_err(map_err),
217 #[cfg(feature = "backend-stdio")]
218 SeaProducerBackend::Stdio(i) => i.anchor(stream).map_err(map_err),
219 #[cfg(feature = "backend-file")]
220 SeaProducerBackend::File(i) => i.anchor(stream).map_err(map_err),
221 }
222 }
223
224 fn anchored(&self) -> SeaResult<&StreamKey> {
225 match &self.backend {
226 #[cfg(feature = "backend-kafka")]
227 SeaProducerBackend::Kafka(i) => i.anchored().map_err(map_err),
228 #[cfg(feature = "backend-redis")]
229 SeaProducerBackend::Redis(i) => i.anchored().map_err(map_err),
230 #[cfg(feature = "backend-stdio")]
231 SeaProducerBackend::Stdio(i) => i.anchored().map_err(map_err),
232 #[cfg(feature = "backend-file")]
233 SeaProducerBackend::File(i) => i.anchored().map_err(map_err),
234 }
235 }
236}
237
238impl Future for SendFuture {
239 type Output = StreamResult<Receipt, BackendErr>;
240
241 fn poll(
242 self: Pin<&mut Self>,
243 cx: &mut std::task::Context<'_>,
244 ) -> std::task::Poll<Self::Output> {
245 match Pin::into_inner(self) {
246 #[cfg(feature = "backend-kafka")]
247 Self::Kafka(fut) => match Pin::new(fut).poll_unpin(cx) {
248 Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
249 Poll::Pending => Poll::Pending,
250 },
251 #[cfg(feature = "backend-redis")]
252 Self::Redis(fut) => match Pin::new(fut).poll_unpin(cx) {
253 Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
254 Poll::Pending => Poll::Pending,
255 },
256 #[cfg(feature = "backend-stdio")]
257 Self::Stdio(fut) => match Pin::new(fut).poll_unpin(cx) {
258 Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
259 Poll::Pending => Poll::Pending,
260 },
261 #[cfg(feature = "backend-file")]
262 Self::File(fut) => match Pin::new(fut).poll_unpin(cx) {
263 Poll::Ready(res) => Poll::Ready(res.map_err(map_err)),
264 Poll::Pending => Poll::Pending,
265 },
266 }
267 }
268}