faucet-source-s3
An AWS S3 source that reads objects from a bucket and parses them as JSON Lines, JSON arrays, or raw text, with concurrent object reads via buffer_unordered.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-s3"] }
Quick Start
use ;
use Source;
async
Configuration
S3SourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
bucket |
String |
(required) | S3 bucket name |
prefix |
Option<String> |
None |
Object key prefix filter. Only objects whose key starts with this prefix are read |
region |
Option<String> |
None |
AWS region. None uses the SDK default (from env vars or instance metadata) |
endpoint_url |
Option<String> |
None |
Custom endpoint URL for S3-compatible services (e.g. MinIO, LocalStack) |
file_format |
S3FileFormat |
JsonLines |
Format of the files to read |
max_objects |
Option<usize> |
None |
Maximum number of objects to read. None means read all matching objects |
concurrency |
usize |
10 |
Maximum number of concurrent object reads |
batch_size |
usize |
1000 |
Records per StreamPage emitted by Source::stream_pages. See Streaming and batching below |
Streaming and batching
S3Source::stream_pages lists objects matching the configured prefix and
streams their records into StreamPages without buffering the full scan.
The exact behaviour depends on file_format:
JsonLines— the object body is decoded line-by-line viatokio::io::AsyncBufReadExt::lines, so client-side memory is bounded atO(batch_size)lines regardless of file or scan size. Records flow across object boundaries — a single page may carry lines drawn from multiple objects.RawText— each object contributes exactly one record ({"key": ..., "content": ...}). The contract requires the whole file as a single string, so the object is necessarily held in memory once, but it is streamed straight into that oneStringvia the same decoding readerJsonLinesuses (no separate raw + decompressed copies for compressed objects). Records are then accumulated into the samebatch_sizebuffer JsonLines uses.JsonArray— the array can only be validated once the closing]is observed, so each object is buffered fully and then its records are chunked into pages ofbatch_size. This bounds page size atbatch_sizebut not peak per-object memory — sources that must stream arbitrarily large array files should be re-emitted asJsonLinesupstream. The defaultbatch_size = 1000keeps allocation reasonable for typical sizes.
Memory ceiling —
RawText/JsonArray. Both formats hold one whole decoded object in memory at a time (this is inherent:RawText's record is the whole file, and a JSON array isn't valid until its closing]). Because objects are fetched concurrently, peak memory is bounded by roughlyconcurrency× (largest object's decoded size), not bybatch_size. If you read largeRawText/JsonArrayobjects, lowerconcurrencyto cap peak memory, or re-emit the data asJsonLinesupstream so it streams line-by-line atO(batch_size).
batch_size = 0 is the "no batching" sentinel: every emitted
StreamPage corresponds to exactly one S3 object — no within-object
chunking and no cross-object accumulation. Use it for small lookup files,
or for downstream sinks (SQL COPY, BigQuery load jobs, Snowflake stage
uploads) that prefer one large request per file to many small ones.
Values larger than MAX_BATCH_SIZE (1,000,000) are rejected by
faucet_core::validate_batch_size.
The S3 source has no incremental-replication mode today, so every emitted
page carries bookmark: None.
Note —
JsonArraystreaming is bounded at one full object in memory at a time. True incremental JSON-array parsing (e.g. viaserde_json::StreamDeserializerover anAsyncRead) is a separate follow-up; today the parse happens after the body is fully buffered.
File Formats (S3FileFormat)
| Variant | Description | Record Output |
|---|---|---|
JsonLines (default) |
Each line in the file is a separate JSON record | One record per non-empty line |
JsonArray |
The entire file is a JSON array of records | One record per array element |
RawText |
Each file becomes a single record | {"key": "<object-key>", "content": "<file-text>"} |
Config Loading
use ;
use S3SourceConfig;
let config: S3SourceConfig = load_json?;
let config: S3SourceConfig = load_env_file?;
Example JSON config
Example .env file
S3_SOURCE_BUCKET=my-data-lake
S3_SOURCE_PREFIX=raw/events/
S3_SOURCE_REGION=us-east-1
S3_SOURCE_CONCURRENCY=10
Config Schema Introspection
use Source;
let source = new.await?;
let schema = source.config_schema;
println!;
Examples
Reading JSON Lines files from S3
use ;
use Source;
let config = new
.prefix
.region
.concurrency;
let source = new.await?;
let records = source.fetch_all.await?;
println!;
Reading JSON array files from MinIO
use ;
use Source;
let config = new
.endpoint_url
.region
.file_format
.prefix;
let source = new.await?;
let records = source.fetch_all.await?;
Reading raw text files
use ;
use Source;
let config = new
.prefix
.file_format
.max_objects;
let source = new.await?;
let records = source.fetch_all.await?;
// Each record has "key" and "content" fields
for record in &records
AWS Authentication
This source uses the standard AWS SDK credential chain. Credentials are resolved automatically from (in order):
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN) - AWS config files (
~/.aws/credentials,~/.aws/config) - IAM instance roles (on EC2/ECS/Lambda)
No credential fields are included in the config -- use the standard AWS environment instead.
Compression
Behind the crate-local compression Cargo feature. Adds a compression config
field with values none, gzip, zstd, or auto (the default — detects
.gz / .zst from the file path / object key).
YAML example:
kind: s3
config:
# ... existing fields ...
compression: auto # or 'gzip' | 'zstd' | 'none'
The codec resolves per object key, so a single source can read a mix of compressed and uncompressed objects in one run.
License
Licensed under MIT or Apache-2.0.