faucet-source-redis
A Redis source that reads records from lists, streams, or key patterns, returning them as JSON.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-redis"] }
Quick Start
use ;
async
Configuration
RedisSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
url |
String |
(required) | Redis connection URL (e.g. "redis://127.0.0.1:6379"). Masked in debug output for security |
source_type |
RedisSourceType |
(required) | The type of Redis data structure to read from |
max_records |
Option<usize> |
None |
Optional maximum number of records to return (truncates after fetching) |
batch_size |
usize |
DEFAULT_BATCH_SIZE (1000) |
Records per emitted StreamPage when driven via Source::stream_pages. 0 is the "no batching" sentinel — drains the underlying primitive and emits a single page. See Streaming and batching. |
Source Types (RedisSourceType)
List
Read all elements from a Redis list via LRANGE 0 -1.
| Field | Type | Description |
|---|---|---|
key |
String |
The list key |
Each list element is parsed as JSON. If an element is not valid JSON, it is returned as a JSON string.
Stream
Read entries from a Redis stream via XREAD or XREADGROUP.
| Field | Type | Default | Description |
|---|---|---|---|
key |
String |
(required) | The stream key |
group |
Option<String> |
None |
Consumer group name. When set, uses XREADGROUP |
consumer |
Option<String> |
None |
Consumer name within the group (required when group is set) |
count |
Option<usize> |
None (100 for XREADGROUP) |
Maximum number of entries to read per call |
Each stream entry is returned as:
Stream field values are parsed as JSON where possible; otherwise they are returned as strings.
Keys
Scan for keys matching a glob pattern, then MGET all matched keys in a single round-trip.
| Field | Type | Description |
|---|---|---|
pattern |
String |
Glob pattern for SCAN (e.g. "user:*") |
Each key-value pair is returned as:
Values are parsed as JSON where possible; otherwise they are returned as strings.
Streaming and batching
RedisSource overrides
Source::stream_pages
so the pipeline can write pages to the sink as they arrive instead of
buffering the full result set. Each mode maps batch_size onto its native
paging primitive:
| Mode | Native primitive | Per-page command |
|---|---|---|
Keys |
SCAN COUNT batch_size cursor → MGET per batch |
SCAN MATCH <pattern> COUNT <batch_size> then MGET key1 ... keyN |
Stream |
XRANGE - + COUNT batch_size, advancing start ID |
XRANGE <key> <start> + COUNT <batch_size> |
List |
LRANGE start stop, sliding the window |
LRANGE <key> <start> <start + batch_size - 1> |
batch_size = 0 is the "no batching" sentinel:
Keys: theSCANcursor is drained to completion and a singleMGETretrieves all matched keys; oneStreamPageis yielded.Stream: a singleXRANGE - +drains the stream; oneStreamPageis yielded.List: a singleLRANGE 0 -1drains the list; oneStreamPageis yielded.
Bookmarks are None on every page — the Redis source has no
incremental-replication mode today.
When the consumer-group fields (group / consumer) are set, they apply only
to the XREADGROUP path used by fetch_all. Streaming via stream_pages
always uses XRANGE because the page-then-write contract is incompatible
with XREADGROUP's deferred-acknowledgement semantics.
The trait-level batch_size argument that stream_pages receives is ignored
in favour of RedisSourceConfig::batch_size — the config is the user-facing
knob, and routing the pipeline hint through it would silently override an
explicit config value.
Config Loading
use ;
use RedisSourceConfig;
let config: RedisSourceConfig = load_json?;
let config: RedisSourceConfig = load_env_file?;
Example JSON config (List)
Example JSON config (Stream with consumer group)
Example JSON config (Keys)
Example .env file
REDIS_SOURCE_URL=redis://127.0.0.1:6379
REDIS_SOURCE_MAX_RECORDS=1000
Config Schema Introspection
use Source;
let source = new;
let schema = source.config_schema;
println!;
Examples
Reading from a list
use ;
use Source;
let config = new
.max_records;
let source = new;
let notifications = source.fetch_all.await?;
Reading from a stream with consumer group
use ;
use Source;
let config = new;
let source = new;
let events = source.fetch_all.await?;
for event in &events
Scanning keys by pattern
use ;
use Source;
let config = new;
let source = new;
let users = source.fetch_all.await?;
for user in &users
License
Licensed under MIT or Apache-2.0.