Skip to main content

faucet_sink_redis/
config.rs

1//! Redis sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// The type of Redis data structure to write to.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type")]
10pub enum RedisSinkType {
11    /// Append records to a Redis list via `RPUSH`.
12    List {
13        /// The list key.
14        key: String,
15    },
16    /// Add entries to a Redis stream via `XADD`.
17    Stream {
18        /// The stream key.
19        key: String,
20    },
21    /// Set individual keys via `SET`, using a field from each record as the key.
22    KeyValue {
23        /// The field name in each record whose value becomes the Redis key.
24        key_field: String,
25    },
26}
27
28/// Configuration for the Redis sink connector.
29#[derive(Clone, Serialize, Deserialize, JsonSchema)]
30pub struct RedisSinkConfig {
31    /// Redis connection URL (e.g. `"redis://127.0.0.1:6379"`).
32    pub url: String,
33    /// The type of Redis data structure to write to.
34    pub sink_type: RedisSinkType,
35    /// Maximum number of commands packed into a single Redis pipeline.
36    /// Defaults to [`DEFAULT_BATCH_SIZE`] (1000), which is a comfortable
37    /// per-pipeline working set for Redis: pipelined commands are cheap,
38    /// so bigger windows amortise the round-trip but very large pipelines
39    /// can stall other clients sharing the server.
40    ///
41    /// When `write_batch` receives a slice larger than `batch_size`, the
42    /// sink re-chunks it into `batch_size` slices and issues one Redis
43    /// pipeline per chunk. `batch_size = 0` is the **"no batching"
44    /// sentinel**: the entire records slice is packed into a single
45    /// Redis pipeline, no matter how large, preserving upstream
46    /// [`StreamPage`](faucet_core::StreamPage) framing.
47    #[serde(default = "default_batch_size")]
48    pub batch_size: usize,
49}
50
51fn default_batch_size() -> usize {
52    DEFAULT_BATCH_SIZE
53}
54
55impl std::fmt::Debug for RedisSinkConfig {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("RedisSinkConfig")
58            .field("url", &"***")
59            .field("sink_type", &self.sink_type)
60            .field("batch_size", &self.batch_size)
61            .finish()
62    }
63}
64
65impl RedisSinkConfig {
66    /// Create a new config with the given URL and sink type.
67    pub fn new(url: impl Into<String>, sink_type: RedisSinkType) -> Self {
68        Self {
69            url: url.into(),
70            sink_type,
71            batch_size: DEFAULT_BATCH_SIZE,
72        }
73    }
74
75    /// Set the maximum number of commands per Redis pipeline.
76    ///
77    /// Pass `0` to opt out of re-chunking — the entire records slice handed
78    /// to `write_batch` is packed into a single Redis pipeline, preserving
79    /// upstream [`StreamPage`](faucet_core::StreamPage) framing.
80    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
81        self.batch_size = batch_size;
82        self
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn default_config() {
92        let config = RedisSinkConfig::new(
93            "redis://127.0.0.1:6379",
94            RedisSinkType::List {
95                key: "my_list".into(),
96            },
97        );
98        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
99        // Debug should mask URL
100        let debug = format!("{config:?}");
101        assert!(debug.contains("***"));
102        assert!(!debug.contains("127.0.0.1"));
103    }
104
105    #[test]
106    fn with_batch_size_overrides_default() {
107        let config = RedisSinkConfig::new(
108            "redis://localhost",
109            RedisSinkType::Stream {
110                key: "events".into(),
111            },
112        )
113        .with_batch_size(100);
114        assert_eq!(config.batch_size, 100);
115    }
116
117    #[test]
118    fn key_value_sink_type() {
119        let config = RedisSinkConfig::new(
120            "redis://localhost",
121            RedisSinkType::KeyValue {
122                key_field: "id".into(),
123            },
124        );
125        if let RedisSinkType::KeyValue { ref key_field } = config.sink_type {
126            assert_eq!(key_field, "id");
127        } else {
128            panic!("expected KeyValue variant");
129        }
130    }
131
132    #[test]
133    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
134        let config =
135            RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() })
136                .with_batch_size(0);
137        assert_eq!(config.batch_size, 0);
138        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
139    }
140
141    #[test]
142    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
143        let config =
144            RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() })
145                .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
146        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
147    }
148
149    #[test]
150    fn batch_size_deserializes_from_json() {
151        let json = r#"{
152            "url": "redis://localhost",
153            "sink_type": { "type": "List", "key": "items" },
154            "batch_size": 250
155        }"#;
156        let config: RedisSinkConfig = serde_json::from_str(json).unwrap();
157        assert_eq!(config.batch_size, 250);
158    }
159
160    #[test]
161    fn batch_size_defaults_when_absent_from_json() {
162        let json = r#"{
163            "url": "redis://localhost",
164            "sink_type": { "type": "List", "key": "items" }
165        }"#;
166        let config: RedisSinkConfig = serde_json::from_str(json).unwrap();
167        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
168    }
169}