sea_streamer_socket/
consumer_options.rs

1#[cfg(feature = "backend-file")]
2use sea_streamer_file::{AutoStreamReset as FileAutoStreamReset, FileConsumerOptions};
3#[cfg(feature = "backend-kafka")]
4use sea_streamer_kafka::{AutoOffsetReset, KafkaConsumerOptions};
5#[cfg(feature = "backend-redis")]
6use sea_streamer_redis::{AutoStreamReset as RedisAutoStreamReset, RedisConsumerOptions};
7#[cfg(feature = "backend-stdio")]
8use sea_streamer_stdio::StdioConsumerOptions;
9
10use crate::{map_err, BackendErr, SeaResult};
11use sea_streamer_types::{ConsumerGroup, ConsumerMode, ConsumerOptions};
12
13#[derive(Debug, Default, Clone)]
14/// `sea-streamer-socket` concrete type of ConsumerOptions.
15pub struct SeaConsumerOptions {
16    #[cfg(feature = "backend-kafka")]
17    kafka: KafkaConsumerOptions,
18    #[cfg(feature = "backend-redis")]
19    redis: RedisConsumerOptions,
20    #[cfg(feature = "backend-stdio")]
21    stdio: StdioConsumerOptions,
22    #[cfg(feature = "backend-file")]
23    file: FileConsumerOptions,
24}
25
26#[derive(Debug, Copy, Clone, PartialEq, Eq)]
27pub enum SeaStreamReset {
28    Earliest,
29    Latest,
30}
31
32impl SeaConsumerOptions {
33    #[cfg(feature = "backend-kafka")]
34    pub fn into_kafka_consumer_options(self) -> KafkaConsumerOptions {
35        self.kafka
36    }
37
38    #[cfg(feature = "backend-redis")]
39    pub fn into_redis_consumer_options(self) -> RedisConsumerOptions {
40        self.redis
41    }
42
43    #[cfg(feature = "backend-stdio")]
44    pub fn into_stdio_consumer_options(self) -> StdioConsumerOptions {
45        self.stdio
46    }
47
48    #[cfg(feature = "backend-file")]
49    pub fn into_file_consumer_options(self) -> FileConsumerOptions {
50        self.file
51    }
52
53    #[cfg(feature = "backend-kafka")]
54    /// Set options that only applies to Kafka
55    pub fn set_kafka_consumer_options<F: FnOnce(&mut KafkaConsumerOptions)>(&mut self, func: F) {
56        func(&mut self.kafka)
57    }
58
59    #[cfg(feature = "backend-redis")]
60    /// Set options that only applies to Redis
61    pub fn set_redis_consumer_options<F: FnOnce(&mut RedisConsumerOptions)>(&mut self, func: F) {
62        func(&mut self.redis)
63    }
64
65    #[cfg(feature = "backend-stdio")]
66    /// Set options that only applies to Stdio
67    pub fn set_stdio_consumer_options<F: FnOnce(&mut StdioConsumerOptions)>(&mut self, func: F) {
68        func(&mut self.stdio)
69    }
70
71    #[cfg(feature = "backend-file")]
72    /// Set options that only applies to File
73    pub fn set_file_consumer_options<F: FnOnce(&mut FileConsumerOptions)>(&mut self, func: F) {
74        func(&mut self.file)
75    }
76
77    pub fn set_auto_stream_reset(&mut self, _val: SeaStreamReset) {
78        #[cfg(feature = "backend-kafka")]
79        self.set_kafka_consumer_options(|options| {
80            options.set_auto_offset_reset(match _val {
81                SeaStreamReset::Earliest => AutoOffsetReset::Earliest,
82                SeaStreamReset::Latest => AutoOffsetReset::Latest,
83            });
84        });
85        #[cfg(feature = "backend-redis")]
86        self.set_redis_consumer_options(|options| {
87            options.set_auto_stream_reset(match _val {
88                SeaStreamReset::Earliest => RedisAutoStreamReset::Earliest,
89                SeaStreamReset::Latest => RedisAutoStreamReset::Latest,
90            });
91        });
92        #[cfg(feature = "backend-file")]
93        self.set_file_consumer_options(|options| {
94            options.set_auto_stream_reset(match _val {
95                SeaStreamReset::Earliest => FileAutoStreamReset::Earliest,
96                SeaStreamReset::Latest => FileAutoStreamReset::Latest,
97            });
98        });
99    }
100}
101
102impl ConsumerOptions for SeaConsumerOptions {
103    type Error = BackendErr;
104
105    fn new(mode: ConsumerMode) -> Self {
106        Self {
107            #[cfg(feature = "backend-kafka")]
108            kafka: KafkaConsumerOptions::new(mode),
109            #[cfg(feature = "backend-redis")]
110            redis: RedisConsumerOptions::new(mode),
111            #[cfg(feature = "backend-stdio")]
112            stdio: StdioConsumerOptions::new(mode),
113            #[cfg(feature = "backend-file")]
114            file: FileConsumerOptions::new(mode),
115        }
116    }
117
118    /// Get currently set ConsumerMode
119    fn mode(&self) -> SeaResult<&ConsumerMode> {
120        #![allow(unreachable_code)]
121
122        #[cfg(feature = "backend-kafka")]
123        return self.kafka.mode().map_err(map_err);
124        #[cfg(feature = "backend-redis")]
125        return self.redis.mode().map_err(map_err);
126        #[cfg(feature = "backend-stdio")]
127        return self.stdio.mode().map_err(map_err);
128        #[cfg(feature = "backend-file")]
129        return self.file.mode().map_err(map_err);
130    }
131
132    /// Get currently set consumer group; may return `StreamErr::ConsumerGroupNotSet`.
133    fn consumer_group(&self) -> SeaResult<&ConsumerGroup> {
134        #![allow(unreachable_code)]
135
136        #[cfg(feature = "backend-kafka")]
137        return self.kafka.consumer_group().map_err(map_err);
138        #[cfg(feature = "backend-redis")]
139        return self.redis.consumer_group().map_err(map_err);
140        #[cfg(feature = "backend-stdio")]
141        return self.stdio.consumer_group().map_err(map_err);
142        #[cfg(feature = "backend-file")]
143        return self.file.consumer_group().map_err(map_err);
144    }
145
146    /// Set consumer group for this consumer. Note the semantic is implementation-specific.
147    fn set_consumer_group(&mut self, group_id: ConsumerGroup) -> SeaResult<&mut Self> {
148        #![allow(clippy::redundant_clone)]
149        #[cfg(feature = "backend-kafka")]
150        self.kafka
151            .set_consumer_group(group_id.clone())
152            .map_err(map_err)?;
153        #[cfg(feature = "backend-redis")]
154        self.redis
155            .set_consumer_group(group_id.clone())
156            .map_err(map_err)?;
157        #[cfg(feature = "backend-stdio")]
158        self.stdio
159            .set_consumer_group(group_id.clone())
160            .map_err(map_err)?;
161        #[cfg(feature = "backend-file")]
162        self.file
163            .set_consumer_group(group_id.clone())
164            .map_err(map_err)?;
165        Ok(self)
166    }
167}