1use crate::config::{RedisSourceConfig, RedisSourceType};
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use redis::AsyncCommands;
7use serde_json::{Value, json};
8use std::pin::Pin;
9
10pub struct RedisSource {
12 config: RedisSourceConfig,
13 conn: tokio::sync::OnceCell<redis::aio::MultiplexedConnection>,
18}
19
20impl RedisSource {
21 pub fn new(config: RedisSourceConfig) -> Result<Self, FaucetError> {
25 faucet_core::validate_batch_size(config.batch_size)?;
26 Ok(Self {
27 config,
28 conn: tokio::sync::OnceCell::new(),
29 })
30 }
31
32 async fn connection(&self) -> Result<redis::aio::MultiplexedConnection, FaucetError> {
35 let conn = self
36 .conn
37 .get_or_try_init(|| async {
38 let client = redis::Client::open(self.config.url.as_str())
39 .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
40 client
41 .get_multiplexed_async_connection()
42 .await
43 .map_err(|e| FaucetError::Source(format!("Redis connection failed: {e}")))
44 })
45 .await?;
46 Ok(conn.clone())
47 }
48
49 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
51 let mut conn = self.connection().await?;
52
53 let mut records = match &self.config.source_type {
54 RedisSourceType::List { key } => self.fetch_list(&mut conn, key).await?,
55 RedisSourceType::Stream {
56 key,
57 group,
58 consumer,
59 count,
60 } => {
61 self.fetch_stream(&mut conn, key, group, consumer, count)
62 .await?
63 }
64 RedisSourceType::Keys { pattern } => self.fetch_keys(&mut conn, pattern).await?,
65 };
66
67 if let Some(max) = self.config.max_records {
68 records.truncate(max);
69 }
70
71 tracing::info!(records = records.len(), "Redis fetch complete");
72 Ok(records)
73 }
74
75 async fn fetch_list(
77 &self,
78 conn: &mut redis::aio::MultiplexedConnection,
79 key: &str,
80 ) -> Result<Vec<Value>, FaucetError> {
81 let values: Vec<String> = conn
82 .lrange(key, 0, -1)
83 .await
84 .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
85
86 let records = values
87 .into_iter()
88 .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
89 .collect();
90
91 Ok(records)
92 }
93
94 async fn fetch_stream(
96 &self,
97 conn: &mut redis::aio::MultiplexedConnection,
98 key: &str,
99 group: &Option<String>,
100 consumer: &Option<String>,
101 count: &Option<usize>,
102 ) -> Result<Vec<Value>, FaucetError> {
103 let mut records = Vec::new();
104 match (group, consumer) {
105 (Some(group_name), Some(consumer_name)) => {
106 let per_read = count.unwrap_or(100).max(1);
112 loop {
113 let opts = redis::streams::StreamReadOptions::default().count(per_read);
114 let reply: redis::streams::StreamReadReply = conn
115 .xread_options(&[key], &[">"], &opts.group(group_name, consumer_name))
116 .await
117 .map_err(|e| {
118 FaucetError::Source(format!("XREADGROUP failed on '{key}': {e}"))
119 })?;
120 let mut got = 0usize;
121 for stream_key in &reply.keys {
122 for entry in &stream_key.ids {
123 records.push(stream_entry_to_json(&entry.id, &entry.map));
124 got += 1;
125 }
126 }
127 if got < per_read {
131 break;
132 }
133 if let Some(max) = self.config.max_records
134 && records.len() >= max
135 {
136 break;
137 }
138 }
139 }
140 _ => {
141 let mut opts = redis::streams::StreamReadOptions::default();
144 if let Some(c) = count {
145 opts = opts.count(*c);
146 }
147 let reply: redis::streams::StreamReadReply = conn
148 .xread_options(&[key], &["0"], &opts)
149 .await
150 .map_err(|e| FaucetError::Source(format!("XREAD failed on '{key}': {e}")))?;
151 for stream_key in &reply.keys {
152 for entry in &stream_key.ids {
153 records.push(stream_entry_to_json(&entry.id, &entry.map));
154 }
155 }
156 }
157 }
158
159 Ok(records)
160 }
161
162 async fn fetch_keys(
164 &self,
165 conn: &mut redis::aio::MultiplexedConnection,
166 pattern: &str,
167 ) -> Result<Vec<Value>, FaucetError> {
168 let keys: Vec<String> = {
169 let mut collected = Vec::new();
170 let mut iter: redis::AsyncIter<String> =
171 conn.scan_match(pattern).await.map_err(|e| {
172 FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}"))
173 })?;
174
175 while let Some(key) = iter.next_item().await {
176 collected.push(key);
177 }
178 collected
179 };
180
181 if keys.is_empty() {
182 return Ok(Vec::new());
183 }
184
185 let values: Vec<Option<String>> = redis::cmd("MGET")
186 .arg(&keys)
187 .query_async(conn)
188 .await
189 .map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
190
191 let mut records = Vec::new();
192 for (key, value) in keys.iter().zip(values.into_iter()) {
193 if let Some(v) = value {
194 let parsed =
195 serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
196 records.push(json!({
197 "key": key,
198 "value": parsed,
199 }));
200 }
201 }
202
203 Ok(records)
204 }
205}
206
207fn stream_entry_to_json(id: &str, map: &std::collections::HashMap<String, redis::Value>) -> Value {
210 let mut fields = serde_json::Map::new();
211 for (field_name, field_value) in map {
212 let val = match field_value {
213 redis::Value::BulkString(bytes) => {
214 let s = String::from_utf8_lossy(bytes);
215 serde_json::from_str::<Value>(&s).unwrap_or_else(|_| Value::String(s.into_owned()))
216 }
217 redis::Value::SimpleString(s) => {
218 serde_json::from_str::<Value>(s).unwrap_or_else(|_| Value::String(s.clone()))
219 }
220 redis::Value::Int(n) => json!(n),
221 redis::Value::Double(n) => json!(n),
222 redis::Value::Boolean(b) => json!(b),
223 redis::Value::Nil => Value::Null,
224 other => Value::String(format!("{other:?}")),
225 };
226 fields.insert(field_name.clone(), val);
227 }
228 json!({
229 "id": id,
230 "fields": Value::Object(fields),
231 })
232}
233
234fn next_stream_id(id: &str) -> String {
238 if let Some((ms, seq)) = id.split_once('-')
241 && let (Ok(ms), Ok(seq)) = (ms.parse::<u64>(), seq.parse::<u64>())
242 {
243 return match seq.checked_add(1) {
244 Some(next_seq) => format!("{ms}-{next_seq}"),
245 None => format!("{}-0", ms.saturating_add(1)),
246 };
247 }
248 format!("{id}\u{0}")
252}
253
254#[async_trait]
255impl faucet_core::Source for RedisSource {
256 async fn fetch_with_context(
257 &self,
258 context: &std::collections::HashMap<String, serde_json::Value>,
259 ) -> Result<Vec<Value>, FaucetError> {
260 if context.is_empty() {
261 return RedisSource::fetch_all(self).await;
262 }
263
264 let mut conn = self.connection().await?;
265
266 let mut records = match &self.config.source_type {
268 RedisSourceType::List { key } => {
269 let resolved_key = faucet_core::util::substitute_context(key, context);
270 self.fetch_list(&mut conn, &resolved_key).await?
271 }
272 RedisSourceType::Stream {
273 key,
274 group,
275 consumer,
276 count,
277 } => {
278 let resolved_key = faucet_core::util::substitute_context(key, context);
279 self.fetch_stream(&mut conn, &resolved_key, group, consumer, count)
280 .await?
281 }
282 RedisSourceType::Keys { pattern } => {
283 let resolved_pattern = faucet_core::util::substitute_context(pattern, context);
284 self.fetch_keys(&mut conn, &resolved_pattern).await?
285 }
286 };
287
288 if let Some(max) = self.config.max_records {
289 records.truncate(max);
290 }
291
292 tracing::info!(
293 records = records.len(),
294 "Redis fetch complete (with context)"
295 );
296 Ok(records)
297 }
298
299 fn stream_pages<'a>(
313 &'a self,
314 context: &'a std::collections::HashMap<String, Value>,
315 _batch_size: usize,
316 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
317 let batch_size = self.config.batch_size;
318 let max_records = self.config.max_records;
319
320 Box::pin(async_stream::try_stream! {
321 let mut conn = self.connection().await?;
322
323 let mut emitted: usize = 0;
324
325 match &self.config.source_type {
326 RedisSourceType::List { key } => {
327 let resolved = if context.is_empty() {
328 key.clone()
329 } else {
330 faucet_core::util::substitute_context(key, context)
331 };
332 let pages = stream_list(&mut conn, &resolved, batch_size, max_records);
333 futures::pin_mut!(pages);
334 while let Some(page) = futures::StreamExt::next(&mut pages).await {
335 let page = page?;
336 emitted += page.records.len();
337 yield page;
338 }
339 }
340 RedisSourceType::Stream { key, .. } => {
341 let resolved = if context.is_empty() {
347 key.clone()
348 } else {
349 faucet_core::util::substitute_context(key, context)
350 };
351 let pages = stream_xrange(&mut conn, &resolved, batch_size, max_records);
352 futures::pin_mut!(pages);
353 while let Some(page) = futures::StreamExt::next(&mut pages).await {
354 let page = page?;
355 emitted += page.records.len();
356 yield page;
357 }
358 }
359 RedisSourceType::Keys { pattern } => {
360 let resolved = if context.is_empty() {
361 pattern.clone()
362 } else {
363 faucet_core::util::substitute_context(pattern, context)
364 };
365 let pages = stream_keys(&mut conn, &resolved, batch_size, max_records);
366 futures::pin_mut!(pages);
367 while let Some(page) = futures::StreamExt::next(&mut pages).await {
368 let page = page?;
369 emitted += page.records.len();
370 yield page;
371 }
372 }
373 }
374
375 tracing::info!(
376 records = emitted,
377 batch_size,
378 "Redis source stream complete",
379 );
380 })
381 }
382
383 fn config_schema(&self) -> serde_json::Value {
384 serde_json::to_value(faucet_core::schema_for!(RedisSourceConfig))
385 .expect("schema serialization")
386 }
387}
388
389fn stream_list<'a>(
400 conn: &'a mut redis::aio::MultiplexedConnection,
401 key: &'a str,
402 batch_size: usize,
403 max_records: Option<usize>,
404) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
405 async_stream::try_stream! {
406 if batch_size == 0 {
407 let values: Vec<String> = conn
408 .lrange(key, 0, -1)
409 .await
410 .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
411 let mut records: Vec<Value> = values
412 .into_iter()
413 .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
414 .collect();
415 if let Some(max) = max_records {
416 records.truncate(max);
417 }
418 yield StreamPage { records, bookmark: None };
419 return;
420 }
421
422 let mut start: isize = 0;
423 let mut emitted: usize = 0;
424 loop {
425 let stop: isize = start + batch_size as isize - 1;
426 let values: Vec<String> = conn
427 .lrange(key, start, stop)
428 .await
429 .map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
430 if values.is_empty() {
431 break;
432 }
433 let mut records: Vec<Value> = values
434 .into_iter()
435 .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
436 .collect();
437 let returned = records.len();
438 let mut stop_after_yield = false;
440 if let Some(max) = max_records
441 && emitted + records.len() >= max
442 {
443 records.truncate(max - emitted);
444 stop_after_yield = true;
445 }
446 emitted += records.len();
447 yield StreamPage { records, bookmark: None };
448 if stop_after_yield || returned < batch_size {
449 break;
450 }
451 start += batch_size as isize;
452 }
453 }
454}
455
456fn stream_xrange<'a>(
460 conn: &'a mut redis::aio::MultiplexedConnection,
461 key: &'a str,
462 batch_size: usize,
463 max_records: Option<usize>,
464) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
465 async_stream::try_stream! {
466 if batch_size == 0 {
467 let reply: redis::streams::StreamRangeReply = conn
468 .xrange_all(key)
469 .await
470 .map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
471 let mut records: Vec<Value> = reply
472 .ids
473 .iter()
474 .map(|entry| stream_entry_to_json(&entry.id, &entry.map))
475 .collect();
476 if let Some(max) = max_records {
477 records.truncate(max);
478 }
479 yield StreamPage { records, bookmark: None };
480 return;
481 }
482
483 let mut start: String = "-".to_string();
484 let mut emitted: usize = 0;
485 loop {
486 let reply: redis::streams::StreamRangeReply = conn
487 .xrange_count(key, &start, "+", batch_size)
488 .await
489 .map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
490
491 if reply.ids.is_empty() {
492 break;
493 }
494
495 let last_id = reply
498 .ids
499 .last()
500 .expect("non-empty checked above")
501 .id
502 .clone();
503 let returned = reply.ids.len();
504 let mut records: Vec<Value> = reply
505 .ids
506 .into_iter()
507 .map(|entry| stream_entry_to_json(&entry.id, &entry.map))
508 .collect();
509
510 let mut stop_after_yield = false;
511 if let Some(max) = max_records
512 && emitted + records.len() >= max
513 {
514 records.truncate(max - emitted);
515 stop_after_yield = true;
516 }
517 emitted += records.len();
518 yield StreamPage { records, bookmark: None };
519
520 if stop_after_yield || returned < batch_size {
521 break;
522 }
523 start = next_stream_id(&last_id);
524 }
525 }
526}
527
528fn stream_keys<'a>(
534 conn: &'a mut redis::aio::MultiplexedConnection,
535 pattern: &'a str,
536 batch_size: usize,
537 max_records: Option<usize>,
538) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
539 use faucet_core::DEFAULT_BATCH_SIZE;
540 async_stream::try_stream! {
541 let scan_hint = if batch_size == 0 { DEFAULT_BATCH_SIZE } else { batch_size };
548 let chunk_size = if batch_size == 0 { usize::MAX } else { batch_size };
551 let cap = max_records.unwrap_or(usize::MAX);
552
553 let mut cursor: u64 = 0;
554 let mut buffer: Vec<String> = Vec::new();
555 let mut emitted: usize = 0;
556
557 'scan: loop {
558 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
559 .arg(cursor)
560 .arg("MATCH")
561 .arg(pattern)
562 .arg("COUNT")
563 .arg(scan_hint)
564 .query_async(conn)
565 .await
566 .map_err(|e| FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}")))?;
567 cursor = next_cursor;
568 buffer.extend(keys);
569
570 while emitted < cap && buffer.len() >= chunk_size {
572 let take = chunk_size.min(cap - emitted);
573 let page_keys: Vec<String> = buffer.drain(..take).collect();
574 let records = mget_records(conn, &page_keys).await?;
575 emitted += records.len();
576 yield StreamPage { records, bookmark: None };
577 }
578
579 if cursor == 0 || emitted >= cap {
580 break 'scan;
581 }
582 }
583
584 if emitted < cap && !buffer.is_empty() {
586 let take = (cap - emitted).min(buffer.len());
587 let page_keys: Vec<String> = buffer.drain(..take).collect();
588 let records = mget_records(conn, &page_keys).await?;
589 yield StreamPage { records, bookmark: None };
590 }
591 }
592}
593
594async fn mget_records(
597 conn: &mut redis::aio::MultiplexedConnection,
598 keys: &[String],
599) -> Result<Vec<Value>, FaucetError> {
600 let values: Vec<Option<String>> = redis::cmd("MGET")
601 .arg(keys)
602 .query_async(conn)
603 .await
604 .map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
605 Ok(collect_kv_records(keys, values))
606}
607
608fn collect_kv_records(keys: &[String], values: Vec<Option<String>>) -> Vec<Value> {
612 keys.iter()
613 .zip(values)
614 .filter_map(|(key, value)| {
615 value.map(|v| {
616 let parsed =
617 serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
618 json!({ "key": key, "value": parsed })
619 })
620 })
621 .collect()
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use crate::config::RedisSourceConfig;
628
629 #[test]
630 fn creates_source() {
631 let config = RedisSourceConfig::new(
632 "redis://localhost",
633 RedisSourceType::List { key: "test".into() },
634 );
635 let _source = RedisSource::new(config).unwrap();
636 }
637
638 #[test]
639 fn new_rejects_out_of_range_batch_size() {
640 let mut config = RedisSourceConfig::new(
641 "redis://localhost",
642 RedisSourceType::List { key: "test".into() },
643 );
644 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
645 match RedisSource::new(config) {
646 Err(FaucetError::Config(m)) => assert!(m.contains("batch_size"), "got: {m}"),
647 other => panic!(
648 "expected a batch_size Config error, got {:?}",
649 other.is_ok()
650 ),
651 }
652 }
653
654 #[test]
655 fn next_stream_id_increments_sequence() {
656 assert_eq!(next_stream_id("1234-0"), "1234-1");
657 assert_eq!(next_stream_id("1234-99"), "1234-100");
658 }
659
660 #[test]
661 fn next_stream_id_wraps_seq_overflow() {
662 let id = format!("5-{}", u64::MAX);
663 assert_eq!(next_stream_id(&id), "6-0");
664 }
665
666 #[test]
667 fn next_stream_id_falls_back_on_malformed_id() {
668 let next = next_stream_id("not-a-real-id");
670 assert!(next.starts_with("not-a-real-id"));
671 assert!(next.ends_with('\u{0}'));
672 }
673
674 #[test]
675 fn stream_entry_to_json_extracts_id_and_fields() {
676 let mut map = std::collections::HashMap::new();
677 map.insert(
678 "field1".to_string(),
679 redis::Value::BulkString(b"value1".to_vec()),
680 );
681 map.insert("field2".to_string(), redis::Value::Int(42));
682 let json = stream_entry_to_json("100-0", &map);
683 assert_eq!(json["id"], "100-0");
684 assert_eq!(json["fields"]["field1"], "value1");
685 assert_eq!(json["fields"]["field2"], 42);
686 }
687}