use crate::config::{RedisSourceConfig, RedisSourceType};
use async_trait::async_trait;
use faucet_core::{FaucetError, Stream, StreamPage};
use redis::AsyncCommands;
use serde_json::{Value, json};
use std::pin::Pin;
pub struct RedisSource {
config: RedisSourceConfig,
conn: tokio::sync::OnceCell<redis::aio::MultiplexedConnection>,
}
impl RedisSource {
pub fn new(config: RedisSourceConfig) -> Result<Self, FaucetError> {
faucet_core::validate_batch_size(config.batch_size)?;
Ok(Self {
config,
conn: tokio::sync::OnceCell::new(),
})
}
async fn connection(&self) -> Result<redis::aio::MultiplexedConnection, FaucetError> {
let conn = self
.conn
.get_or_try_init(|| async {
let client = redis::Client::open(self.config.url.as_str())
.map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
client
.get_multiplexed_async_connection()
.await
.map_err(|e| FaucetError::Source(format!("Redis connection failed: {e}")))
})
.await?;
Ok(conn.clone())
}
pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
let mut conn = self.connection().await?;
let mut records = match &self.config.source_type {
RedisSourceType::List { key } => self.fetch_list(&mut conn, key).await?,
RedisSourceType::Stream {
key,
group,
consumer,
count,
} => {
self.fetch_stream(&mut conn, key, group, consumer, count)
.await?
}
RedisSourceType::Keys { pattern } => self.fetch_keys(&mut conn, pattern).await?,
};
if let Some(max) = self.config.max_records {
records.truncate(max);
}
tracing::info!(records = records.len(), "Redis fetch complete");
Ok(records)
}
async fn fetch_list(
&self,
conn: &mut redis::aio::MultiplexedConnection,
key: &str,
) -> Result<Vec<Value>, FaucetError> {
let values: Vec<String> = conn
.lrange(key, 0, -1)
.await
.map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
let records = values
.into_iter()
.map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
.collect();
Ok(records)
}
async fn fetch_stream(
&self,
conn: &mut redis::aio::MultiplexedConnection,
key: &str,
group: &Option<String>,
consumer: &Option<String>,
count: &Option<usize>,
) -> Result<Vec<Value>, FaucetError> {
let mut records = Vec::new();
match (group, consumer) {
(Some(group_name), Some(consumer_name)) => {
let per_read = count.unwrap_or(100).max(1);
loop {
let opts = redis::streams::StreamReadOptions::default().count(per_read);
let reply: redis::streams::StreamReadReply = conn
.xread_options(&[key], &[">"], &opts.group(group_name, consumer_name))
.await
.map_err(|e| {
FaucetError::Source(format!("XREADGROUP failed on '{key}': {e}"))
})?;
let mut got = 0usize;
for stream_key in &reply.keys {
for entry in &stream_key.ids {
records.push(stream_entry_to_json(&entry.id, &entry.map));
got += 1;
}
}
if got < per_read {
break;
}
if let Some(max) = self.config.max_records
&& records.len() >= max
{
break;
}
}
}
_ => {
let mut opts = redis::streams::StreamReadOptions::default();
if let Some(c) = count {
opts = opts.count(*c);
}
let reply: redis::streams::StreamReadReply = conn
.xread_options(&[key], &["0"], &opts)
.await
.map_err(|e| FaucetError::Source(format!("XREAD failed on '{key}': {e}")))?;
for stream_key in &reply.keys {
for entry in &stream_key.ids {
records.push(stream_entry_to_json(&entry.id, &entry.map));
}
}
}
}
Ok(records)
}
async fn fetch_keys(
&self,
conn: &mut redis::aio::MultiplexedConnection,
pattern: &str,
) -> Result<Vec<Value>, FaucetError> {
let keys: Vec<String> = {
let mut collected = Vec::new();
let mut iter: redis::AsyncIter<String> =
conn.scan_match(pattern).await.map_err(|e| {
FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}"))
})?;
while let Some(key) = iter.next_item().await {
collected.push(key);
}
collected
};
if keys.is_empty() {
return Ok(Vec::new());
}
let values: Vec<Option<String>> = redis::cmd("MGET")
.arg(&keys)
.query_async(conn)
.await
.map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
let mut records = Vec::new();
for (key, value) in keys.iter().zip(values.into_iter()) {
if let Some(v) = value {
let parsed =
serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
records.push(json!({
"key": key,
"value": parsed,
}));
}
}
Ok(records)
}
}
fn stream_entry_to_json(id: &str, map: &std::collections::HashMap<String, redis::Value>) -> Value {
let mut fields = serde_json::Map::new();
for (field_name, field_value) in map {
let val = match field_value {
redis::Value::BulkString(bytes) => {
let s = String::from_utf8_lossy(bytes);
serde_json::from_str::<Value>(&s).unwrap_or_else(|_| Value::String(s.into_owned()))
}
redis::Value::SimpleString(s) => {
serde_json::from_str::<Value>(s).unwrap_or_else(|_| Value::String(s.clone()))
}
redis::Value::Int(n) => json!(n),
redis::Value::Double(n) => json!(n),
redis::Value::Boolean(b) => json!(b),
redis::Value::Nil => Value::Null,
other => Value::String(format!("{other:?}")),
};
fields.insert(field_name.clone(), val);
}
json!({
"id": id,
"fields": Value::Object(fields),
})
}
fn next_stream_id(id: &str) -> String {
if let Some((ms, seq)) = id.split_once('-')
&& let (Ok(ms), Ok(seq)) = (ms.parse::<u64>(), seq.parse::<u64>())
{
return match seq.checked_add(1) {
Some(next_seq) => format!("{ms}-{next_seq}"),
None => format!("{}-0", ms.saturating_add(1)),
};
}
format!("{id}\u{0}")
}
#[async_trait]
impl faucet_core::Source for RedisSource {
async fn fetch_with_context(
&self,
context: &std::collections::HashMap<String, serde_json::Value>,
) -> Result<Vec<Value>, FaucetError> {
if context.is_empty() {
return RedisSource::fetch_all(self).await;
}
let mut conn = self.connection().await?;
let mut records = match &self.config.source_type {
RedisSourceType::List { key } => {
let resolved_key = faucet_core::util::substitute_context(key, context);
self.fetch_list(&mut conn, &resolved_key).await?
}
RedisSourceType::Stream {
key,
group,
consumer,
count,
} => {
let resolved_key = faucet_core::util::substitute_context(key, context);
self.fetch_stream(&mut conn, &resolved_key, group, consumer, count)
.await?
}
RedisSourceType::Keys { pattern } => {
let resolved_pattern = faucet_core::util::substitute_context(pattern, context);
self.fetch_keys(&mut conn, &resolved_pattern).await?
}
};
if let Some(max) = self.config.max_records {
records.truncate(max);
}
tracing::info!(
records = records.len(),
"Redis fetch complete (with context)"
);
Ok(records)
}
fn stream_pages<'a>(
&'a self,
context: &'a std::collections::HashMap<String, Value>,
_batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
let batch_size = self.config.batch_size;
let max_records = self.config.max_records;
Box::pin(async_stream::try_stream! {
let mut conn = self.connection().await?;
let mut emitted: usize = 0;
match &self.config.source_type {
RedisSourceType::List { key } => {
let resolved = if context.is_empty() {
key.clone()
} else {
faucet_core::util::substitute_context(key, context)
};
let pages = stream_list(&mut conn, &resolved, batch_size, max_records);
futures::pin_mut!(pages);
while let Some(page) = futures::StreamExt::next(&mut pages).await {
let page = page?;
emitted += page.records.len();
yield page;
}
}
RedisSourceType::Stream { key, .. } => {
let resolved = if context.is_empty() {
key.clone()
} else {
faucet_core::util::substitute_context(key, context)
};
let pages = stream_xrange(&mut conn, &resolved, batch_size, max_records);
futures::pin_mut!(pages);
while let Some(page) = futures::StreamExt::next(&mut pages).await {
let page = page?;
emitted += page.records.len();
yield page;
}
}
RedisSourceType::Keys { pattern } => {
let resolved = if context.is_empty() {
pattern.clone()
} else {
faucet_core::util::substitute_context(pattern, context)
};
let pages = stream_keys(&mut conn, &resolved, batch_size, max_records);
futures::pin_mut!(pages);
while let Some(page) = futures::StreamExt::next(&mut pages).await {
let page = page?;
emitted += page.records.len();
yield page;
}
}
}
tracing::info!(
records = emitted,
batch_size,
"Redis source stream complete",
);
})
}
fn config_schema(&self) -> serde_json::Value {
serde_json::to_value(faucet_core::schema_for!(RedisSourceConfig))
.expect("schema serialization")
}
}
fn stream_list<'a>(
conn: &'a mut redis::aio::MultiplexedConnection,
key: &'a str,
batch_size: usize,
max_records: Option<usize>,
) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
async_stream::try_stream! {
if batch_size == 0 {
let values: Vec<String> = conn
.lrange(key, 0, -1)
.await
.map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
let mut records: Vec<Value> = values
.into_iter()
.map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
.collect();
if let Some(max) = max_records {
records.truncate(max);
}
yield StreamPage { records, bookmark: None };
return;
}
let mut start: isize = 0;
let mut emitted: usize = 0;
loop {
let stop: isize = start + batch_size as isize - 1;
let values: Vec<String> = conn
.lrange(key, start, stop)
.await
.map_err(|e| FaucetError::Source(format!("LRANGE failed on '{key}': {e}")))?;
if values.is_empty() {
break;
}
let mut records: Vec<Value> = values
.into_iter()
.map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
.collect();
let returned = records.len();
let mut stop_after_yield = false;
if let Some(max) = max_records
&& emitted + records.len() >= max
{
records.truncate(max - emitted);
stop_after_yield = true;
}
emitted += records.len();
yield StreamPage { records, bookmark: None };
if stop_after_yield || returned < batch_size {
break;
}
start += batch_size as isize;
}
}
}
fn stream_xrange<'a>(
conn: &'a mut redis::aio::MultiplexedConnection,
key: &'a str,
batch_size: usize,
max_records: Option<usize>,
) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
async_stream::try_stream! {
if batch_size == 0 {
let reply: redis::streams::StreamRangeReply = conn
.xrange_all(key)
.await
.map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
let mut records: Vec<Value> = reply
.ids
.iter()
.map(|entry| stream_entry_to_json(&entry.id, &entry.map))
.collect();
if let Some(max) = max_records {
records.truncate(max);
}
yield StreamPage { records, bookmark: None };
return;
}
let mut start: String = "-".to_string();
let mut emitted: usize = 0;
loop {
let reply: redis::streams::StreamRangeReply = conn
.xrange_count(key, &start, "+", batch_size)
.await
.map_err(|e| FaucetError::Source(format!("XRANGE failed on '{key}': {e}")))?;
if reply.ids.is_empty() {
break;
}
let last_id = reply
.ids
.last()
.expect("non-empty checked above")
.id
.clone();
let returned = reply.ids.len();
let mut records: Vec<Value> = reply
.ids
.into_iter()
.map(|entry| stream_entry_to_json(&entry.id, &entry.map))
.collect();
let mut stop_after_yield = false;
if let Some(max) = max_records
&& emitted + records.len() >= max
{
records.truncate(max - emitted);
stop_after_yield = true;
}
emitted += records.len();
yield StreamPage { records, bookmark: None };
if stop_after_yield || returned < batch_size {
break;
}
start = next_stream_id(&last_id);
}
}
}
fn stream_keys<'a>(
conn: &'a mut redis::aio::MultiplexedConnection,
pattern: &'a str,
batch_size: usize,
max_records: Option<usize>,
) -> impl Stream<Item = Result<StreamPage, FaucetError>> + 'a {
use faucet_core::DEFAULT_BATCH_SIZE;
async_stream::try_stream! {
let scan_hint = if batch_size == 0 { DEFAULT_BATCH_SIZE } else { batch_size };
let chunk_size = if batch_size == 0 { usize::MAX } else { batch_size };
let cap = max_records.unwrap_or(usize::MAX);
let mut cursor: u64 = 0;
let mut buffer: Vec<String> = Vec::new();
let mut emitted: usize = 0;
'scan: loop {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.arg("COUNT")
.arg(scan_hint)
.query_async(conn)
.await
.map_err(|e| FaucetError::Source(format!("SCAN failed with pattern '{pattern}': {e}")))?;
cursor = next_cursor;
buffer.extend(keys);
while emitted < cap && buffer.len() >= chunk_size {
let take = chunk_size.min(cap - emitted);
let page_keys: Vec<String> = buffer.drain(..take).collect();
let records = mget_records(conn, &page_keys).await?;
emitted += records.len();
yield StreamPage { records, bookmark: None };
}
if cursor == 0 || emitted >= cap {
break 'scan;
}
}
if emitted < cap && !buffer.is_empty() {
let take = (cap - emitted).min(buffer.len());
let page_keys: Vec<String> = buffer.drain(..take).collect();
let records = mget_records(conn, &page_keys).await?;
yield StreamPage { records, bookmark: None };
}
}
}
async fn mget_records(
conn: &mut redis::aio::MultiplexedConnection,
keys: &[String],
) -> Result<Vec<Value>, FaucetError> {
let values: Vec<Option<String>> = redis::cmd("MGET")
.arg(keys)
.query_async(conn)
.await
.map_err(|e| FaucetError::Source(format!("MGET failed: {e}")))?;
Ok(collect_kv_records(keys, values))
}
fn collect_kv_records(keys: &[String], values: Vec<Option<String>>) -> Vec<Value> {
keys.iter()
.zip(values)
.filter_map(|(key, value)| {
value.map(|v| {
let parsed =
serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
json!({ "key": key, "value": parsed })
})
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::RedisSourceConfig;
#[test]
fn creates_source() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "test".into() },
);
let _source = RedisSource::new(config).unwrap();
}
#[test]
fn new_rejects_out_of_range_batch_size() {
let mut config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "test".into() },
);
config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
match RedisSource::new(config) {
Err(FaucetError::Config(m)) => assert!(m.contains("batch_size"), "got: {m}"),
other => panic!(
"expected a batch_size Config error, got {:?}",
other.is_ok()
),
}
}
#[test]
fn next_stream_id_increments_sequence() {
assert_eq!(next_stream_id("1234-0"), "1234-1");
assert_eq!(next_stream_id("1234-99"), "1234-100");
}
#[test]
fn next_stream_id_wraps_seq_overflow() {
let id = format!("5-{}", u64::MAX);
assert_eq!(next_stream_id(&id), "6-0");
}
#[test]
fn next_stream_id_falls_back_on_malformed_id() {
let next = next_stream_id("not-a-real-id");
assert!(next.starts_with("not-a-real-id"));
assert!(next.ends_with('\u{0}'));
}
#[test]
fn stream_entry_to_json_extracts_id_and_fields() {
let mut map = std::collections::HashMap::new();
map.insert(
"field1".to_string(),
redis::Value::BulkString(b"value1".to_vec()),
);
map.insert("field2".to_string(), redis::Value::Int(42));
let json = stream_entry_to_json("100-0", &map);
assert_eq!(json["id"], "100-0");
assert_eq!(json["fields"]["field1"], "value1");
assert_eq!(json["fields"]["field2"], 42);
}
}