faucet_sink_redis/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type")]
10pub enum RedisSinkType {
11 List {
13 key: String,
15 },
16 Stream {
18 key: String,
20 },
21 KeyValue {
23 key_field: String,
25 },
26}
27
28#[derive(Clone, Serialize, Deserialize, JsonSchema)]
30pub struct RedisSinkConfig {
31 pub url: String,
33 pub sink_type: RedisSinkType,
35 #[serde(default = "default_batch_size")]
48 pub batch_size: usize,
49}
50
51fn default_batch_size() -> usize {
52 DEFAULT_BATCH_SIZE
53}
54
55impl std::fmt::Debug for RedisSinkConfig {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("RedisSinkConfig")
58 .field("url", &"***")
59 .field("sink_type", &self.sink_type)
60 .field("batch_size", &self.batch_size)
61 .finish()
62 }
63}
64
65impl RedisSinkConfig {
66 pub fn new(url: impl Into<String>, sink_type: RedisSinkType) -> Self {
68 Self {
69 url: url.into(),
70 sink_type,
71 batch_size: DEFAULT_BATCH_SIZE,
72 }
73 }
74
75 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
81 self.batch_size = batch_size;
82 self
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89
90 #[test]
91 fn default_config() {
92 let config = RedisSinkConfig::new(
93 "redis://127.0.0.1:6379",
94 RedisSinkType::List {
95 key: "my_list".into(),
96 },
97 );
98 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
99 let debug = format!("{config:?}");
101 assert!(debug.contains("***"));
102 assert!(!debug.contains("127.0.0.1"));
103 }
104
105 #[test]
106 fn with_batch_size_overrides_default() {
107 let config = RedisSinkConfig::new(
108 "redis://localhost",
109 RedisSinkType::Stream {
110 key: "events".into(),
111 },
112 )
113 .with_batch_size(100);
114 assert_eq!(config.batch_size, 100);
115 }
116
117 #[test]
118 fn key_value_sink_type() {
119 let config = RedisSinkConfig::new(
120 "redis://localhost",
121 RedisSinkType::KeyValue {
122 key_field: "id".into(),
123 },
124 );
125 if let RedisSinkType::KeyValue { ref key_field } = config.sink_type {
126 assert_eq!(key_field, "id");
127 } else {
128 panic!("expected KeyValue variant");
129 }
130 }
131
132 #[test]
133 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
134 let config =
135 RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() })
136 .with_batch_size(0);
137 assert_eq!(config.batch_size, 0);
138 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
139 }
140
141 #[test]
142 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
143 let config =
144 RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() })
145 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
146 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
147 }
148
149 #[test]
150 fn batch_size_deserializes_from_json() {
151 let json = r#"{
152 "url": "redis://localhost",
153 "sink_type": { "type": "List", "key": "items" },
154 "batch_size": 250
155 }"#;
156 let config: RedisSinkConfig = serde_json::from_str(json).unwrap();
157 assert_eq!(config.batch_size, 250);
158 }
159
160 #[test]
161 fn batch_size_defaults_when_absent_from_json() {
162 let json = r#"{
163 "url": "redis://localhost",
164 "sink_type": { "type": "List", "key": "items" }
165 }"#;
166 let config: RedisSinkConfig = serde_json::from_str(json).unwrap();
167 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
168 }
169}