use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum RedisSourceType {
List {
key: String,
},
Stream {
key: String,
group: Option<String>,
consumer: Option<String>,
count: Option<usize>,
},
Keys {
pattern: String,
},
}
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct RedisSourceConfig {
pub url: String,
pub source_type: RedisSourceType,
pub max_records: Option<usize>,
}
impl std::fmt::Debug for RedisSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisSourceConfig")
.field("url", &"***")
.field("source_type", &self.source_type)
.field("max_records", &self.max_records)
.finish()
}
}
impl RedisSourceConfig {
pub fn new(url: impl Into<String>, source_type: RedisSourceType) -> Self {
Self {
url: url.into(),
source_type,
max_records: None,
}
}
pub fn max_records(mut self, max: usize) -> Self {
self.max_records = Some(max);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_list() {
let config = RedisSourceConfig::new(
"redis://127.0.0.1:6379",
RedisSourceType::List {
key: "my_list".into(),
},
);
assert!(config.max_records.is_none());
let debug = format!("{config:?}");
assert!(debug.contains("***"));
assert!(!debug.contains("127.0.0.1"));
}
#[test]
fn builder_methods() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::Stream {
key: "events".into(),
group: Some("mygroup".into()),
consumer: Some("worker1".into()),
count: Some(100),
},
)
.max_records(500);
assert_eq!(config.max_records, Some(500));
}
#[test]
fn keys_source_type() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::Keys {
pattern: "user:*".into(),
},
);
if let RedisSourceType::Keys { ref pattern } = config.source_type {
assert_eq!(pattern, "user:*");
} else {
panic!("expected Keys variant");
}
}
}