faucet-source-csv
A CSV file source that reads rows from CSV files and returns them as JSON objects, with configurable delimiters, headers, and quote characters.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-csv"] }
Quick Start
use ;
use Source;
async
How It Works
- If the file has headers, each row becomes a JSON object with header names as keys
- If the file has no headers, keys are generated as
column_0,column_1, etc. - All field values are returned as JSON strings (no type inference)
fetch_all/fetch_with_contextread the file via blocking I/O on aspawn_blockingtask to avoid starving the async runtimeSource::stream_pagesreads the file via async line-streaming on a tokioBufReaderand parses each line through a single-recordcsv::ReaderBuilderparse
Configuration
CsvSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
path |
String |
(required) | Path to the CSV file |
has_headers |
bool |
true |
Whether the file has a header row |
delimiter |
u8 |
b',' (comma) |
Field delimiter byte |
quote |
u8 |
b'"' (double quote) |
Quote character byte |
batch_size |
usize |
DEFAULT_BATCH_SIZE (1000) |
Rows per emitted StreamPage in Source::stream_pages. 0 is the "no batching" sentinel — emits all rows in a single page |
Streaming and batching
CsvSource::stream_pages is a true client-side stream: it opens the file via
tokio::fs::File + tokio::io::BufReader, reads the header line first (if
has_headers), then iterates the remaining lines via
AsyncBufReadExt::lines. Each line is parsed through a single-record
csv::ReaderBuilder so quoted fields containing the delimiter
(e.g. "hello, world") parse correctly. There is no server-side concern —
the file is consumed lazily from the local filesystem, so client-side memory
is bounded at O(batch_size) regardless of file size.
batch_size = 0 is the "no batching" sentinel: the file is fully drained
and emitted as one page. Useful for small lookup tables or for sinks (SQL
COPY, BigQuery load jobs) that prefer one large request to many small
ones.
Multi-line quoted records
Parsing uses csv-async, a streaming RFC-4180 reader that tracks quote
state across physical lines. Quoted fields containing embedded newlines
(and embedded delimiters) are parsed correctly as a single record, so a
file produced by faucet-sink-csv round-trips back losslessly through
both fetch_all and the stream_pages streaming path.
Config Loading
use ;
use CsvSourceConfig;
let config: CsvSourceConfig = load_json?;
let config: CsvSourceConfig = load_env_file?;
Example JSON config
Note: delimiter and quote are specified as byte values (44 = comma, 34 = double quote, 9 = tab).
Example .env file
CSV_SOURCE_PATH=/data/exports/customers.csv
CSV_SOURCE_HAS_HEADERS=true
CSV_SOURCE_DELIMITER=44
CSV_SOURCE_QUOTE=34
Config Schema Introspection
use Source;
let source = new;
let schema = source.config_schema;
println!;
Examples
Reading a standard CSV file
use ;
use Source;
let config = new;
let source = new;
let records = source.fetch_all.await?;
// Example record: {"id": "1", "name": "Alice", "email": "alice@example.com"}
for record in &records
Reading a TSV (tab-separated) file
use ;
use Source;
let config = new
.delimiter;
let source = new;
let records = source.fetch_all.await?;
Reading a file without headers
use ;
use Source;
let config = new
.has_headers;
let source = new;
let records = source.fetch_all.await?;
// Keys are generated: column_0, column_1, column_2, ...
println!;
Pipe-delimited file with single-quote quoting
use ;
use Source;
let config = new
.delimiter
.quote;
let source = new;
let records = source.fetch_all.await?;
Compression
Behind the crate-local compression Cargo feature. Adds a compression config
field with values none, gzip, zstd, or auto (the default — detects
.gz / .zst from the file path / object key).
YAML example:
kind: csv
config:
# ... existing fields ...
compression: auto # or 'gzip' | 'zstd' | 'none'
Compression is detected from the file path. Multi-line quoted fields (records with embedded newlines inside quotes) are parsed correctly on both the streaming and fetch_all paths, regardless of compression.
License
Licensed under MIT or Apache-2.0.