faucet_source_redis/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type")]
10pub enum RedisSourceType {
11 List {
13 key: String,
15 },
16 Stream {
19 key: String,
21 group: Option<String>,
26 consumer: Option<String>,
28 count: Option<usize>,
34 },
35 Keys {
37 pattern: String,
39 },
40}
41
42#[derive(Clone, Serialize, Deserialize, JsonSchema)]
44pub struct RedisSourceConfig {
45 pub url: String,
47 pub source_type: RedisSourceType,
49 pub max_records: Option<usize>,
51 #[serde(default = "default_batch_size")]
69 pub batch_size: usize,
70}
71
72fn default_batch_size() -> usize {
73 DEFAULT_BATCH_SIZE
74}
75
76impl std::fmt::Debug for RedisSourceConfig {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("RedisSourceConfig")
79 .field("url", &"***")
80 .field("source_type", &self.source_type)
81 .field("max_records", &self.max_records)
82 .field("batch_size", &self.batch_size)
83 .finish()
84 }
85}
86
87impl RedisSourceConfig {
88 pub fn new(url: impl Into<String>, source_type: RedisSourceType) -> Self {
90 Self {
91 url: url.into(),
92 source_type,
93 max_records: None,
94 batch_size: DEFAULT_BATCH_SIZE,
95 }
96 }
97
98 pub fn max_records(mut self, max: usize) -> Self {
100 self.max_records = Some(max);
101 self
102 }
103
104 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
110 self.batch_size = batch_size;
111 self
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn default_config_list() {
121 let config = RedisSourceConfig::new(
122 "redis://127.0.0.1:6379",
123 RedisSourceType::List {
124 key: "my_list".into(),
125 },
126 );
127 assert!(config.max_records.is_none());
128 let debug = format!("{config:?}");
130 assert!(debug.contains("***"));
131 assert!(!debug.contains("127.0.0.1"));
132 }
133
134 #[test]
135 fn builder_methods() {
136 let config = RedisSourceConfig::new(
137 "redis://localhost",
138 RedisSourceType::Stream {
139 key: "events".into(),
140 group: Some("mygroup".into()),
141 consumer: Some("worker1".into()),
142 count: Some(100),
143 },
144 )
145 .max_records(500);
146 assert_eq!(config.max_records, Some(500));
147 }
148
149 #[test]
150 fn keys_source_type() {
151 let config = RedisSourceConfig::new(
152 "redis://localhost",
153 RedisSourceType::Keys {
154 pattern: "user:*".into(),
155 },
156 );
157 if let RedisSourceType::Keys { ref pattern } = config.source_type {
158 assert_eq!(pattern, "user:*");
159 } else {
160 panic!("expected Keys variant");
161 }
162 }
163
164 #[test]
165 fn batch_size_defaults_to_default_batch_size() {
166 let config = RedisSourceConfig::new(
167 "redis://localhost",
168 RedisSourceType::List { key: "k".into() },
169 );
170 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
171 }
172
173 #[test]
174 fn with_batch_size_overrides_default() {
175 let config = RedisSourceConfig::new(
176 "redis://localhost",
177 RedisSourceType::List { key: "k".into() },
178 )
179 .with_batch_size(500);
180 assert_eq!(config.batch_size, 500);
181 }
182
183 #[test]
184 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
185 let config = RedisSourceConfig::new(
186 "redis://localhost",
187 RedisSourceType::List { key: "k".into() },
188 )
189 .with_batch_size(0);
190 assert_eq!(config.batch_size, 0);
191 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
192 }
193
194 #[test]
195 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
196 let config = RedisSourceConfig::new(
197 "redis://localhost",
198 RedisSourceType::List { key: "k".into() },
199 )
200 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
201 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
202 }
203
204 #[test]
205 fn batch_size_deserializes_from_json() {
206 let json = r#"{
207 "url": "redis://localhost",
208 "source_type": { "type": "List", "key": "items" },
209 "batch_size": 250
210 }"#;
211 let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
212 assert_eq!(config.batch_size, 250);
213 }
214
215 #[test]
216 fn batch_size_field_omitted_defaults_to_default_batch_size() {
217 let json = r#"{
218 "url": "redis://localhost",
219 "source_type": { "type": "List", "key": "items" }
220 }"#;
221 let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
222 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223 }
224}