spring_stream/config/
mod.rs1#[cfg(feature = "file")]
2pub mod file;
3#[cfg(feature = "kafka")]
4pub mod kafka;
5#[cfg(feature = "redis")]
6pub mod redis;
7#[cfg(feature = "stdio")]
8pub mod stdio;
9
10use crate::consumer::ConsumerOpts;
11use schemars::JsonSchema;
12use sea_streamer::{
13 ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConnectOptions, SeaConsumerOptions,
14 SeaProducerOptions,
15};
16use serde::Deserialize;
17use spring::config::Configurable;
18
19spring::submit_config_schema!("stream", StreamConfig);
20
21#[derive(Debug, Configurable, Clone, JsonSchema, Deserialize)]
22#[config_prefix = "stream"]
23pub struct StreamConfig {
24 pub(crate) uri: String,
28
29 #[cfg(feature = "kafka")]
30 pub(crate) kafka: Option<kafka::KafkaOptions>,
31 #[cfg(feature = "redis")]
32 pub(crate) redis: Option<redis::RedisOptions>,
33 #[cfg(feature = "stdio")]
34 pub(crate) stdio: Option<stdio::StdioOptions>,
35 #[cfg(feature = "file")]
36 pub(crate) file: Option<file::FileOptions>,
37}
38
39impl StreamConfig {
40 pub fn connect_options(&self) -> SeaConnectOptions {
41 let mut _connect_options = SeaConnectOptions::default();
42
43 #[cfg(feature = "kafka")]
44 if let Some(kafka) = &self.kafka {
45 _connect_options.set_kafka_connect_options(|opts| kafka.fill_connect_options(opts));
46 }
47 #[cfg(feature = "redis")]
48 if let Some(redis) = &self.redis {
49 _connect_options.set_redis_connect_options(|opts| redis.fill_connect_options(opts));
50 }
51 #[cfg(feature = "stdio")]
52 if let Some(stdio) = &self.stdio {
53 _connect_options.set_stdio_connect_options(|opts| stdio.fill_connect_options(opts));
54 }
55 #[cfg(feature = "file")]
56 if let Some(file) = &self.file {
57 _connect_options.set_file_connect_options(|opts| file.fill_connect_options(opts));
58 }
59 _connect_options
60 }
61
62 pub fn new_consumer_options(&self, ConsumerOpts(opts): ConsumerOpts) -> SeaConsumerOptions {
63 let _mode = opts.mode().ok();
64 let _group = opts.consumer_group().ok();
65 #[cfg(feature = "kafka")]
66 if let Some(kafka) = &self.kafka {
67 let mut consumer_options = kafka.new_consumer_options(_mode, _group);
68 consumer_options.set_kafka_consumer_options(|opts| kafka.fill_consumer_options(opts));
69 return consumer_options;
70 }
71 #[cfg(feature = "redis")]
72 if let Some(redis) = &self.redis {
73 let mut consumer_options = redis.new_consumer_options(_mode, _group);
74 consumer_options.set_redis_consumer_options(|opts| redis.fill_consumer_options(opts));
75 return consumer_options;
76 }
77 #[cfg(feature = "stdio")]
78 if let Some(stdio) = &self.stdio {
79 let mut consumer_options = stdio.new_consumer_options(_mode, _group);
80 consumer_options.set_stdio_consumer_options(|opts| stdio.fill_consumer_options(opts));
81 return consumer_options;
82 }
83 #[cfg(feature = "file")]
84 if let Some(file) = &self.file {
85 let mut consumer_options = file.new_consumer_options(_mode, _group);
86 consumer_options.set_file_consumer_options(|opts| file.fill_consumer_options(opts));
87 return consumer_options;
88 }
89 opts
90 }
91
92 pub fn new_producer_options(&self) -> SeaProducerOptions {
93 let mut _producer_options = SeaProducerOptions::default();
94 #[cfg(feature = "kafka")]
95 if let Some(kafka) = &self.kafka {
96 _producer_options.set_kafka_producer_options(|opts| kafka.fill_producer_options(opts));
97 }
98 #[cfg(feature = "redis")]
99 if let Some(redis) = &self.redis {
100 _producer_options.set_redis_producer_options(|opts| redis.fill_producer_options(opts));
101 }
102 #[cfg(feature = "stdio")]
103 if let Some(stdio) = &self.stdio {
104 _producer_options.set_stdio_producer_options(|opts| stdio.fill_producer_options(opts));
105 }
106 #[cfg(feature = "file")]
107 if let Some(file) = &self.file {
108 _producer_options.set_file_producer_options(|opts| file.fill_producer_options(opts));
109 }
110 _producer_options
111 }
112}
113
114#[allow(dead_code)]
115pub(crate) trait OptionsFiller {
116 type ConnectOptsType;
117 type ConsumerOptsType;
118 type ProducerOptsType;
119 fn fill_connect_options(&self, opts: &mut Self::ConnectOptsType);
120 fn fill_consumer_options(&self, opts: &mut Self::ConsumerOptsType);
121 fn fill_producer_options(&self, opts: &mut Self::ProducerOptsType);
122
123 fn new_consumer_options(
124 &self,
125 mode: Option<&ConsumerMode>,
126 group_id: Option<&ConsumerGroup>,
127 ) -> SeaConsumerOptions {
128 let mode = match mode.or_else(|| self.default_consumer_mode()) {
129 Some(mode) => mode.to_owned(),
130 None => ConsumerMode::default(),
131 };
132 let mut opts = SeaConsumerOptions::new(mode);
133 let group_id = match group_id {
134 Some(group) => Some(group.name().to_string()),
135 None => self.default_consumer_group_id(),
136 };
137 if let Some(group_id) = group_id {
138 let _ = opts.set_consumer_group(ConsumerGroup::new(group_id));
139 }
140 opts
141 }
142 fn default_consumer_mode(&self) -> Option<&ConsumerMode>;
143 fn default_consumer_group_id(&self) -> Option<String>;
144}
145
146#[allow(dead_code)]
147#[derive(Debug, Clone, JsonSchema, Deserialize)]
148#[serde(remote = "ConsumerMode")]
149pub(crate) enum ConsumerModeRef {
150 RealTime,
152 Resumable,
154 LoadBalanced,
156}