faucet_sink_redis/
sink.rs1use crate::config::{RedisSinkConfig, RedisSinkType};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7
8pub struct RedisSink {
13 config: RedisSinkConfig,
14 conn: redis::aio::MultiplexedConnection,
15}
16
17impl RedisSink {
18 pub async fn new(config: RedisSinkConfig) -> Result<Self, FaucetError> {
22 faucet_core::validate_batch_size(config.batch_size)?;
23 let client = redis::Client::open(config.url.as_str())
24 .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
25
26 let conn = client
27 .get_multiplexed_async_connection()
28 .await
29 .map_err(|e| FaucetError::Sink(format!("Redis connection failed: {e}")))?;
30
31 Ok(Self { config, conn })
32 }
33}
34
35#[async_trait]
36impl faucet_core::Sink for RedisSink {
37 fn config_schema(&self) -> serde_json::Value {
38 serde_json::to_value(faucet_core::schema_for!(RedisSinkConfig))
39 .expect("schema serialization")
40 }
41
42 async fn check(
45 &self,
46 ctx: &faucet_core::check::CheckContext,
47 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
48 use faucet_core::check::{CheckReport, Probe};
49
50 let mut conn = self.conn.clone();
52 let started = std::time::Instant::now();
53 let hint = "check the Redis url / that the server is reachable and accepting connections";
54
55 let probe = match tokio::time::timeout(
56 ctx.timeout,
57 redis::cmd("PING").query_async::<String>(&mut conn),
58 )
59 .await
60 {
61 Ok(Ok(_)) => Probe::pass("ping", started.elapsed()),
62 Ok(Err(e)) => Probe::fail_hint("ping", started.elapsed(), e.to_string(), hint),
63 Err(_) => Probe::fail_hint("ping", started.elapsed(), "timed out", hint),
64 };
65 Ok(CheckReport::single(probe))
66 }
67
68 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
69 if records.is_empty() {
70 return Ok(0);
71 }
72
73 let mut conn = self.conn.clone();
76 let mut written = 0usize;
77
78 let effective_chunk = if self.config.batch_size == 0 {
84 records.len()
85 } else {
86 self.config.batch_size
87 };
88
89 for chunk in records.chunks(effective_chunk) {
91 let mut pipe = redis::pipe();
92
93 for record in chunk {
94 match &self.config.sink_type {
95 RedisSinkType::List { key } => {
96 let serialized = serde_json::to_string(record).map_err(|e| {
97 FaucetError::Sink(format!("JSON serialization failed: {e}"))
98 })?;
99 pipe.cmd("RPUSH").arg(key.as_str()).arg(serialized);
100 }
101 RedisSinkType::Stream { key } => {
102 let fields = flatten_record_to_fields(record);
103 if fields.is_empty() {
104 let serialized = serde_json::to_string(record).map_err(|e| {
106 FaucetError::Sink(format!("JSON serialization failed: {e}"))
107 })?;
108 pipe.cmd("XADD")
109 .arg(key.as_str())
110 .arg("*")
111 .arg("_data")
112 .arg(serialized);
113 } else {
114 let mut cmd = redis::cmd("XADD");
115 cmd.arg(key.as_str()).arg("*");
116 for (field_name, field_value) in &fields {
117 cmd.arg(field_name.as_str()).arg(field_value.as_str());
118 }
119 pipe.add_command(cmd);
120 }
121 }
122 RedisSinkType::KeyValue { key_field } => {
123 let key = record
124 .get(key_field)
125 .map(|v| match v {
126 Value::String(s) => s.clone(),
127 other => other.to_string(),
128 })
129 .ok_or_else(|| {
130 FaucetError::Sink(format!("record missing key field '{key_field}'"))
131 })?;
132 let serialized = serde_json::to_string(record).map_err(|e| {
133 FaucetError::Sink(format!("JSON serialization failed: {e}"))
134 })?;
135 pipe.cmd("SET").arg(key).arg(serialized);
136 }
137 }
138 }
139
140 pipe.query_async::<()>(&mut conn)
141 .await
142 .map_err(|e| FaucetError::Sink(format!("Redis pipeline execution failed: {e}")))?;
143
144 written += chunk.len();
145 }
146
147 tracing::debug!(records = written, "Redis batch written");
148 Ok(written)
149 }
150}
151
152fn flatten_record_to_fields(record: &Value) -> Vec<(String, String)> {
155 match record.as_object() {
156 Some(map) => map
157 .iter()
158 .map(|(k, v)| {
159 let val = match v {
160 Value::String(s) => s.clone(),
161 other => other.to_string(),
162 };
163 (k.clone(), val)
164 })
165 .collect(),
166 None => Vec::new(),
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use crate::config::RedisSinkConfig;
174 use serde_json::json;
175
176 #[test]
177 fn config_fields_accessible() {
178 let config = RedisSinkConfig::new(
179 "redis://localhost",
180 RedisSinkType::List { key: "test".into() },
181 );
182 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
185 }
186
187 #[test]
188 fn flatten_object_record() {
189 let record = json!({"name": "Alice", "age": 30});
190 let fields = flatten_record_to_fields(&record);
191 assert_eq!(fields.len(), 2);
192 assert!(fields.iter().any(|(k, v)| k == "name" && v == "Alice"));
193 assert!(fields.iter().any(|(k, v)| k == "age" && v == "30"));
194 }
195
196 #[test]
197 fn flatten_non_object_returns_empty() {
198 let record = json!("just a string");
199 let fields = flatten_record_to_fields(&record);
200 assert!(fields.is_empty());
201 }
202
203 #[test]
204 fn flatten_nested_value_serializes_as_json() {
205 let record = json!({"data": {"nested": true}});
206 let fields = flatten_record_to_fields(&record);
207 assert_eq!(fields.len(), 1);
208 assert_eq!(fields[0].0, "data");
209 assert_eq!(fields[0].1, r#"{"nested":true}"#);
210 }
211
212 #[tokio::test]
213 async fn new_rejects_out_of_range_batch_size() {
214 let mut config =
215 RedisSinkConfig::new("redis://localhost", RedisSinkType::List { key: "k".into() });
216 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
217 match RedisSink::new(config).await {
218 Err(faucet_core::FaucetError::Config(m)) => {
219 assert!(m.contains("batch_size"), "got: {m}")
220 }
221 _ => panic!("expected a batch_size Config error"),
222 }
223 }
224}