Skip to main content

faucet_sink_redis/
sink.rs

1//! Redis sink executor.
2
3use crate::config::{RedisSinkConfig, RedisSinkType};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7
8/// A configured Redis sink that writes records to Redis data structures.
9///
10/// The connection is established once during construction and reused across
11/// all `write_batch()` calls.
12pub struct RedisSink {
13    config: RedisSinkConfig,
14    conn: redis::aio::MultiplexedConnection,
15}
16
17impl RedisSink {
18    /// Create a new Redis sink from the given configuration.
19    ///
20    /// This opens a multiplexed async connection to Redis immediately.
21    pub async fn new(config: RedisSinkConfig) -> Result<Self, FaucetError> {
22        faucet_core::validate_batch_size(config.batch_size)?;
23        let client = redis::Client::open(config.url.as_str())
24            .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
25
26        let conn = client
27            .get_multiplexed_async_connection()
28            .await
29            .map_err(|e| FaucetError::Sink(format!("Redis connection failed: {e}")))?;
30
31        Ok(Self { config, conn })
32    }
33}
34
35#[async_trait]
36impl faucet_core::Sink for RedisSink {
37    fn config_schema(&self) -> serde_json::Value {
38        serde_json::to_value(faucet_core::schema_for!(RedisSinkConfig))
39            .expect("schema serialization")
40    }
41
42    /// Non-mutating preflight probe: issue a Redis `PING` over the existing
43    /// multiplexed connection (probe name `"ping"`).
44    async fn check(
45        &self,
46        ctx: &faucet_core::check::CheckContext,
47    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
48        use faucet_core::check::{CheckReport, Probe};
49
50        // MultiplexedConnection is cheaply cloneable; clone to satisfy &self.
51        let mut conn = self.conn.clone();
52        let started = std::time::Instant::now();
53        let hint = "check the Redis url / that the server is reachable and accepting connections";
54
55        let probe = match tokio::time::timeout(
56            ctx.timeout,
57            redis::cmd("PING").query_async::<String>(&mut conn),
58        )
59        .await
60        {
61            Ok(Ok(_)) => Probe::pass("ping", started.elapsed()),
62            Ok(Err(e)) => Probe::fail_hint("ping", started.elapsed(), e.to_string(), hint),
63            Err(_) => Probe::fail_hint("ping", started.elapsed(), "timed out", hint),
64        };
65        Ok(CheckReport::single(probe))
66    }
67
68    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
69        if records.is_empty() {
70            return Ok(0);
71        }
72
73        // MultiplexedConnection is cheaply cloneable (it shares the
74        // underlying connection), so we clone to satisfy the &self receiver.
75        let mut conn = self.conn.clone();
76        let mut written = 0usize;
77
78        // `batch_size = 0` is the "no batching" sentinel: pack the entire
79        // upstream slice into a single Redis pipeline, preserving
80        // `StreamPage` framing. Otherwise re-chunk into `batch_size`-sized
81        // slices so each Redis pipeline stays near the recommended
82        // ~1000-command working set.
83        let effective_chunk = if self.config.batch_size == 0 {
84            records.len()
85        } else {
86            self.config.batch_size
87        };
88
89        // Process in chunks of batch_size using redis pipelines.
90        for chunk in records.chunks(effective_chunk) {
91            let mut pipe = redis::pipe();
92
93            for record in chunk {
94                match &self.config.sink_type {
95                    RedisSinkType::List { key } => {
96                        let serialized = serde_json::to_string(record).map_err(|e| {
97                            FaucetError::Sink(format!("JSON serialization failed: {e}"))
98                        })?;
99                        pipe.cmd("RPUSH").arg(key.as_str()).arg(serialized);
100                    }
101                    RedisSinkType::Stream { key } => {
102                        let fields = flatten_record_to_fields(record);
103                        if fields.is_empty() {
104                            // XADD requires at least one field.
105                            let serialized = serde_json::to_string(record).map_err(|e| {
106                                FaucetError::Sink(format!("JSON serialization failed: {e}"))
107                            })?;
108                            pipe.cmd("XADD")
109                                .arg(key.as_str())
110                                .arg("*")
111                                .arg("_data")
112                                .arg(serialized);
113                        } else {
114                            let mut cmd = redis::cmd("XADD");
115                            cmd.arg(key.as_str()).arg("*");
116                            for (field_name, field_value) in &fields {
117                                cmd.arg(field_name.as_str()).arg(field_value.as_str());
118                            }
119                            pipe.add_command(cmd);
120                        }
121                    }
122                    RedisSinkType::KeyValue { key_field } => {
123                        let key = record
124                            .get(key_field)
125                            .map(|v| match v {
126                                Value::String(s) => s.clone(),
127                                other => other.to_string(),
128                            })
129                            .ok_or_else(|| {
130                                FaucetError::Sink(format!("record missing key field '{key_field}'"))
131                            })?;
132                        let serialized = serde_json::to_string(record).map_err(|e| {
133                            FaucetError::Sink(format!("JSON serialization failed: {e}"))
134                        })?;
135                        pipe.cmd("SET").arg(key).arg(serialized);
136                    }
137                }
138            }
139
140            pipe.query_async::<()>(&mut conn)
141                .await
142                .map_err(|e| FaucetError::Sink(format!("Redis pipeline execution failed: {e}")))?;
143
144            written += chunk.len();
145        }
146
147        tracing::debug!(records = written, "Redis batch written");
148        Ok(written)
149    }
150}
151
152/// Flatten a JSON record's top-level fields into string key-value pairs
153/// suitable for Redis stream entries.
154fn flatten_record_to_fields(record: &Value) -> Vec<(String, String)> {
155    match record.as_object() {
156        Some(map) => map
157            .iter()
158            .map(|(k, v)| {
159                let val = match v {
160                    Value::String(s) => s.clone(),
161                    other => other.to_string(),
162                };
163                (k.clone(), val)
164            })
165            .collect(),
166        None => Vec::new(),
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::config::RedisSinkConfig;
174    use serde_json::json;
175
176    #[test]
177    fn config_fields_accessible() {
178        let config = RedisSinkConfig::new(
179            "redis://localhost",
180            RedisSinkType::List { key: "test".into() },
181        );
182        // RedisSink::new() is async and requires a live Redis connection,
183        // so we only verify the config here.
184        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
185    }
186
187    #[test]
188    fn flatten_object_record() {
189        let record = json!({"name": "Alice", "age": 30});
190        let fields = flatten_record_to_fields(&record);
191        assert_eq!(fields.len(), 2);
192        assert!(fields.iter().any(|(k, v)| k == "name" && v == "Alice"));
193        assert!(fields.iter().any(|(k, v)| k == "age" && v == "30"));
194    }
195
196    #[test]
197    fn flatten_non_object_returns_empty() {
198        let record = json!("just a string");
199        let fields = flatten_record_to_fields(&record);
200        assert!(fields.is_empty());
201    }
202
203    #[test]
204    fn flatten_nested_value_serializes_as_json() {
205        let record = json!({"data": {"nested": true}});
206        let fields = flatten_record_to_fields(&record);
207        assert_eq!(fields.len(), 1);
208        assert_eq!(fields[0].0, "data");
209        assert_eq!(fields[0].1, r#"{"nested":true}"#);
210    }
211
212    #[tokio::test]
213    async fn new_rejects_out_of_range_batch_size() {
214        let mut config =
215            RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() });
216        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
217        match RedisSink::new(config).await {
218            Err(faucet_core::FaucetError::Config(m)) => {
219                assert!(m.contains("batch_size"), "got: {m}")
220            }
221            _ => panic!("expected a batch_size Config error"),
222        }
223    }
224}