spring_stream/config/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#[cfg(feature = "file")]
pub mod file;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "stdio")]
pub mod stdio;

use crate::consumer::ConsumerOpts;
use schemars::JsonSchema;
use sea_streamer::{
    ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConnectOptions, SeaConsumerOptions,
    SeaProducerOptions,
};
use serde::Deserialize;
use spring::config::Configurable;

#[derive(Debug, Configurable, Clone, JsonSchema, Deserialize)]
#[config_prefix = "stream"]
pub struct StreamConfig {
    /// [streamer server uri][config]
    /// 
    /// [config]: https://docs.rs/sea-streamer-types/latest/sea_streamer_types/struct.StreamerUri.html
    pub(crate) uri: String,

    #[cfg(feature = "kafka")]
    pub(crate) kafka: Option<kafka::KafkaOptions>,
    #[cfg(feature = "redis")]
    pub(crate) redis: Option<redis::RedisOptions>,
    #[cfg(feature = "stdio")]
    pub(crate) stdio: Option<stdio::StdioOptions>,
    #[cfg(feature = "file")]
    pub(crate) file: Option<file::FileOptions>,
}

impl StreamConfig {
    pub fn connect_options(&self) -> SeaConnectOptions {
        let mut _connect_options = SeaConnectOptions::default();

        #[cfg(feature = "kafka")]
        if let Some(kafka) = &self.kafka {
            _connect_options.set_kafka_connect_options(|opts| kafka.fill_connect_options(opts));
        }
        #[cfg(feature = "redis")]
        if let Some(redis) = &self.redis {
            _connect_options.set_redis_connect_options(|opts| redis.fill_connect_options(opts));
        }
        #[cfg(feature = "stdio")]
        if let Some(stdio) = &self.stdio {
            _connect_options.set_stdio_connect_options(|opts| stdio.fill_connect_options(opts));
        }
        #[cfg(feature = "file")]
        if let Some(file) = &self.file {
            _connect_options.set_file_connect_options(|opts| file.fill_connect_options(opts));
        }
        _connect_options
    }

    pub fn new_consumer_options(&self, ConsumerOpts(opts): ConsumerOpts) -> SeaConsumerOptions {
        let _mode = opts.mode().ok();
        let _group = match opts.consumer_group() {
            Ok(group) => Some(group),
            _ => None,
        };
        #[cfg(feature = "kafka")]
        if let Some(kafka) = &self.kafka {
            let mut consumer_options = kafka.new_consumer_options(_mode, _group);
            consumer_options.set_kafka_consumer_options(|opts| kafka.fill_consumer_options(opts));
            return consumer_options;
        }
        #[cfg(feature = "redis")]
        if let Some(redis) = &self.redis {
            let mut consumer_options = redis.new_consumer_options(_mode, _group);
            consumer_options.set_redis_consumer_options(|opts| redis.fill_consumer_options(opts));
            return consumer_options;
        }
        #[cfg(feature = "stdio")]
        if let Some(stdio) = &self.stdio {
            let mut consumer_options = stdio.new_consumer_options(_mode, _group);
            consumer_options.set_stdio_consumer_options(|opts| stdio.fill_consumer_options(opts));
            return consumer_options;
        }
        #[cfg(feature = "file")]
        if let Some(file) = &self.file {
            let mut consumer_options = file.new_consumer_options(_mode, _group);
            consumer_options.set_file_consumer_options(|opts| file.fill_consumer_options(opts));
            return consumer_options;
        }
        opts
    }

    pub fn new_producer_options(&self) -> SeaProducerOptions {
        let mut _producer_options = SeaProducerOptions::default();
        #[cfg(feature = "kafka")]
        if let Some(kafka) = &self.kafka {
            _producer_options.set_kafka_producer_options(|opts| kafka.fill_producer_options(opts));
        }
        #[cfg(feature = "redis")]
        if let Some(redis) = &self.redis {
            _producer_options.set_redis_producer_options(|opts| redis.fill_producer_options(opts));
        }
        #[cfg(feature = "stdio")]
        if let Some(stdio) = &self.stdio {
            _producer_options.set_stdio_producer_options(|opts| stdio.fill_producer_options(opts));
        }
        #[cfg(feature = "file")]
        if let Some(file) = &self.file {
            _producer_options.set_file_producer_options(|opts| file.fill_producer_options(opts));
        }
        _producer_options
    }
}

#[allow(dead_code)]
pub(crate) trait OptionsFiller {
    type ConnectOptsType;
    type ConsumerOptsType;
    type ProducerOptsType;
    fn fill_connect_options(&self, opts: &mut Self::ConnectOptsType);
    fn fill_consumer_options(&self, opts: &mut Self::ConsumerOptsType);
    fn fill_producer_options(&self, opts: &mut Self::ProducerOptsType);

    fn new_consumer_options(
        &self,
        mode: Option<&ConsumerMode>,
        group_id: Option<&ConsumerGroup>,
    ) -> SeaConsumerOptions {
        let mode = match mode.or_else(|| self.default_consumer_mode()) {
            Some(mode) => mode.to_owned(),
            None => ConsumerMode::default(),
        };
        let mut opts = SeaConsumerOptions::new(mode);
        let group_id = match group_id {
            Some(group) => Some(group.name().to_string()),
            None => self.default_consumer_group_id(),
        };
        if let Some(group_id) = group_id {
            let _ = opts.set_consumer_group(ConsumerGroup::new(group_id));
        }
        opts
    }
    fn default_consumer_mode(&self) -> Option<&ConsumerMode>;
    fn default_consumer_group_id(&self) -> Option<String>;
}

#[allow(dead_code)]
#[derive(Debug, Clone, JsonSchema, Deserialize)]
#[serde(remote = "ConsumerMode")]
pub(crate) enum ConsumerModeRef {
    /// This is the 'vanilla' stream consumer. It does not auto-commit, and thus only consumes messages from now on.
    RealTime,
    /// When the process restarts, it will resume the stream from the previous committed sequence.
    Resumable,
    /// You should assign a consumer group manually. The load-balancing mechanism is implementation-specific.
    LoadBalanced,
}