spring_stream/config/
mod.rs

1#[cfg(feature = "file")]
2pub mod file;
3#[cfg(feature = "kafka")]
4pub mod kafka;
5#[cfg(feature = "redis")]
6pub mod redis;
7#[cfg(feature = "stdio")]
8pub mod stdio;
9
10use crate::consumer::ConsumerOpts;
11use schemars::JsonSchema;
12use sea_streamer::{
13    ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConnectOptions, SeaConsumerOptions,
14    SeaProducerOptions,
15};
16use serde::Deserialize;
17use spring::config::Configurable;
18
19spring::submit_config_schema!("stream", StreamConfig);
20
21#[derive(Debug, Configurable, Clone, JsonSchema, Deserialize)]
22#[config_prefix = "stream"]
23pub struct StreamConfig {
24    /// [streamer server uri][config]
25    ///
26    /// [config]: https://docs.rs/sea-streamer-types/latest/sea_streamer_types/struct.StreamerUri.html
27    pub(crate) uri: String,
28
29    #[cfg(feature = "kafka")]
30    pub(crate) kafka: Option<kafka::KafkaOptions>,
31    #[cfg(feature = "redis")]
32    pub(crate) redis: Option<redis::RedisOptions>,
33    #[cfg(feature = "stdio")]
34    pub(crate) stdio: Option<stdio::StdioOptions>,
35    #[cfg(feature = "file")]
36    pub(crate) file: Option<file::FileOptions>,
37}
38
39impl StreamConfig {
40    pub fn connect_options(&self) -> SeaConnectOptions {
41        let mut _connect_options = SeaConnectOptions::default();
42
43        #[cfg(feature = "kafka")]
44        if let Some(kafka) = &self.kafka {
45            _connect_options.set_kafka_connect_options(|opts| kafka.fill_connect_options(opts));
46        }
47        #[cfg(feature = "redis")]
48        if let Some(redis) = &self.redis {
49            _connect_options.set_redis_connect_options(|opts| redis.fill_connect_options(opts));
50        }
51        #[cfg(feature = "stdio")]
52        if let Some(stdio) = &self.stdio {
53            _connect_options.set_stdio_connect_options(|opts| stdio.fill_connect_options(opts));
54        }
55        #[cfg(feature = "file")]
56        if let Some(file) = &self.file {
57            _connect_options.set_file_connect_options(|opts| file.fill_connect_options(opts));
58        }
59        _connect_options
60    }
61
62    pub fn new_consumer_options(&self, ConsumerOpts(opts): ConsumerOpts) -> SeaConsumerOptions {
63        let _mode = opts.mode().ok();
64        let _group = opts.consumer_group().ok();
65        #[cfg(feature = "kafka")]
66        if let Some(kafka) = &self.kafka {
67            let mut consumer_options = kafka.new_consumer_options(_mode, _group);
68            consumer_options.set_kafka_consumer_options(|opts| kafka.fill_consumer_options(opts));
69            return consumer_options;
70        }
71        #[cfg(feature = "redis")]
72        if let Some(redis) = &self.redis {
73            let mut consumer_options = redis.new_consumer_options(_mode, _group);
74            consumer_options.set_redis_consumer_options(|opts| redis.fill_consumer_options(opts));
75            return consumer_options;
76        }
77        #[cfg(feature = "stdio")]
78        if let Some(stdio) = &self.stdio {
79            let mut consumer_options = stdio.new_consumer_options(_mode, _group);
80            consumer_options.set_stdio_consumer_options(|opts| stdio.fill_consumer_options(opts));
81            return consumer_options;
82        }
83        #[cfg(feature = "file")]
84        if let Some(file) = &self.file {
85            let mut consumer_options = file.new_consumer_options(_mode, _group);
86            consumer_options.set_file_consumer_options(|opts| file.fill_consumer_options(opts));
87            return consumer_options;
88        }
89        opts
90    }
91
92    pub fn new_producer_options(&self) -> SeaProducerOptions {
93        let mut _producer_options = SeaProducerOptions::default();
94        #[cfg(feature = "kafka")]
95        if let Some(kafka) = &self.kafka {
96            _producer_options.set_kafka_producer_options(|opts| kafka.fill_producer_options(opts));
97        }
98        #[cfg(feature = "redis")]
99        if let Some(redis) = &self.redis {
100            _producer_options.set_redis_producer_options(|opts| redis.fill_producer_options(opts));
101        }
102        #[cfg(feature = "stdio")]
103        if let Some(stdio) = &self.stdio {
104            _producer_options.set_stdio_producer_options(|opts| stdio.fill_producer_options(opts));
105        }
106        #[cfg(feature = "file")]
107        if let Some(file) = &self.file {
108            _producer_options.set_file_producer_options(|opts| file.fill_producer_options(opts));
109        }
110        _producer_options
111    }
112}
113
114#[allow(dead_code)]
115pub(crate) trait OptionsFiller {
116    type ConnectOptsType;
117    type ConsumerOptsType;
118    type ProducerOptsType;
119    fn fill_connect_options(&self, opts: &mut Self::ConnectOptsType);
120    fn fill_consumer_options(&self, opts: &mut Self::ConsumerOptsType);
121    fn fill_producer_options(&self, opts: &mut Self::ProducerOptsType);
122
123    fn new_consumer_options(
124        &self,
125        mode: Option<&ConsumerMode>,
126        group_id: Option<&ConsumerGroup>,
127    ) -> SeaConsumerOptions {
128        let mode = match mode.or_else(|| self.default_consumer_mode()) {
129            Some(mode) => mode.to_owned(),
130            None => ConsumerMode::default(),
131        };
132        let mut opts = SeaConsumerOptions::new(mode);
133        let group_id = match group_id {
134            Some(group) => Some(group.name().to_string()),
135            None => self.default_consumer_group_id(),
136        };
137        if let Some(group_id) = group_id {
138            let _ = opts.set_consumer_group(ConsumerGroup::new(group_id));
139        }
140        opts
141    }
142    fn default_consumer_mode(&self) -> Option<&ConsumerMode>;
143    fn default_consumer_group_id(&self) -> Option<String>;
144}
145
146#[allow(dead_code)]
147#[derive(Debug, Clone, JsonSchema, Deserialize)]
148#[serde(remote = "ConsumerMode")]
149pub(crate) enum ConsumerModeRef {
150    /// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
151    RealTime,
152    /// When the process restarts, it will resume the stream from the previous committed sequence.
153    Resumable,
154    /// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
155    LoadBalanced,
156}