sea_streamer_socket/
consumer_options.rs1#[cfg(feature = "backend-file")]
2use sea_streamer_file::{AutoStreamReset as FileAutoStreamReset, FileConsumerOptions};
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::{AutoOffsetReset, KafkaConsumerOptions};
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::{AutoStreamReset as RedisAutoStreamReset, RedisConsumerOptions};
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioConsumerOptions;
9
10use crate::{map_err, BackendErr, SeaResult};
11use sea_streamer_types::{ConsumerGroup, ConsumerMode, ConsumerOptions};
12
13#[derive(Debug, Default, Clone)]
14pub struct SeaConsumerOptions {
16 #[cfg(feature = "backend-kafka")]
17 kafka: KafkaConsumerOptions,
18 #[cfg(feature = "backend-redis")]
19 redis: RedisConsumerOptions,
20 #[cfg(feature = "backend-stdio")]
21 stdio: StdioConsumerOptions,
22 #[cfg(feature = "backend-file")]
23 file: FileConsumerOptions,
24}
25
26#[derive(Debug, Copy, Clone, PartialEq, Eq)]
27pub enum SeaStreamReset {
28 Earliest,
29 Latest,
30}
31
32impl SeaConsumerOptions {
33 #[cfg(feature = "backend-kafka")]
34 pub fn into_kafka_consumer_options(self) -> KafkaConsumerOptions {
35 self.kafka
36 }
37
38 #[cfg(feature = "backend-redis")]
39 pub fn into_redis_consumer_options(self) -> RedisConsumerOptions {
40 self.redis
41 }
42
43 #[cfg(feature = "backend-stdio")]
44 pub fn into_stdio_consumer_options(self) -> StdioConsumerOptions {
45 self.stdio
46 }
47
48 #[cfg(feature = "backend-file")]
49 pub fn into_file_consumer_options(self) -> FileConsumerOptions {
50 self.file
51 }
52
53 #[cfg(feature = "backend-kafka")]
54 pub fn set_kafka_consumer_options<F: FnOnce(&mut KafkaConsumerOptions)>(&mut self, func: F) {
56 func(&mut self.kafka)
57 }
58
59 #[cfg(feature = "backend-redis")]
60 pub fn set_redis_consumer_options<F: FnOnce(&mut RedisConsumerOptions)>(&mut self, func: F) {
62 func(&mut self.redis)
63 }
64
65 #[cfg(feature = "backend-stdio")]
66 pub fn set_stdio_consumer_options<F: FnOnce(&mut StdioConsumerOptions)>(&mut self, func: F) {
68 func(&mut self.stdio)
69 }
70
71 #[cfg(feature = "backend-file")]
72 pub fn set_file_consumer_options<F: FnOnce(&mut FileConsumerOptions)>(&mut self, func: F) {
74 func(&mut self.file)
75 }
76
77 pub fn set_auto_stream_reset(&mut self, _val: SeaStreamReset) {
78 #[cfg(feature = "backend-kafka")]
79 self.set_kafka_consumer_options(|options| {
80 options.set_auto_offset_reset(match _val {
81 SeaStreamReset::Earliest => AutoOffsetReset::Earliest,
82 SeaStreamReset::Latest => AutoOffsetReset::Latest,
83 });
84 });
85 #[cfg(feature = "backend-redis")]
86 self.set_redis_consumer_options(|options| {
87 options.set_auto_stream_reset(match _val {
88 SeaStreamReset::Earliest => RedisAutoStreamReset::Earliest,
89 SeaStreamReset::Latest => RedisAutoStreamReset::Latest,
90 });
91 });
92 #[cfg(feature = "backend-file")]
93 self.set_file_consumer_options(|options| {
94 options.set_auto_stream_reset(match _val {
95 SeaStreamReset::Earliest => FileAutoStreamReset::Earliest,
96 SeaStreamReset::Latest => FileAutoStreamReset::Latest,
97 });
98 });
99 }
100}
101
102impl ConsumerOptions for SeaConsumerOptions {
103 type Error = BackendErr;
104
105 fn new(mode: ConsumerMode) -> Self {
106 Self {
107 #[cfg(feature = "backend-kafka")]
108 kafka: KafkaConsumerOptions::new(mode),
109 #[cfg(feature = "backend-redis")]
110 redis: RedisConsumerOptions::new(mode),
111 #[cfg(feature = "backend-stdio")]
112 stdio: StdioConsumerOptions::new(mode),
113 #[cfg(feature = "backend-file")]
114 file: FileConsumerOptions::new(mode),
115 }
116 }
117
118 fn mode(&self) -> SeaResult<&ConsumerMode> {
120 #![allow(unreachable_code)]
121
122 #[cfg(feature = "backend-kafka")]
123 return self.kafka.mode().map_err(map_err);
124 #[cfg(feature = "backend-redis")]
125 return self.redis.mode().map_err(map_err);
126 #[cfg(feature = "backend-stdio")]
127 return self.stdio.mode().map_err(map_err);
128 #[cfg(feature = "backend-file")]
129 return self.file.mode().map_err(map_err);
130 }
131
132 fn consumer_group(&self) -> SeaResult<&ConsumerGroup> {
134 #![allow(unreachable_code)]
135
136 #[cfg(feature = "backend-kafka")]
137 return self.kafka.consumer_group().map_err(map_err);
138 #[cfg(feature = "backend-redis")]
139 return self.redis.consumer_group().map_err(map_err);
140 #[cfg(feature = "backend-stdio")]
141 return self.stdio.consumer_group().map_err(map_err);
142 #[cfg(feature = "backend-file")]
143 return self.file.consumer_group().map_err(map_err);
144 }
145
146 fn set_consumer_group(&mut self, group_id: ConsumerGroup) -> SeaResult<&mut Self> {
148 #![allow(clippy::redundant_clone)]
149 #[cfg(feature = "backend-kafka")]
150 self.kafka
151 .set_consumer_group(group_id.clone())
152 .map_err(map_err)?;
153 #[cfg(feature = "backend-redis")]
154 self.redis
155 .set_consumer_group(group_id.clone())
156 .map_err(map_err)?;
157 #[cfg(feature = "backend-stdio")]
158 self.stdio
159 .set_consumer_group(group_id.clone())
160 .map_err(map_err)?;
161 #[cfg(feature = "backend-file")]
162 self.file
163 .set_consumer_group(group_id.clone())
164 .map_err(map_err)?;
165 Ok(self)
166 }
167}