faucet-sink-csv
CSV file sink connector for the faucet-stream ecosystem.
Writes JSON records to a CSV file. Column order is the union of keys across the records of the first write_batch() call. Supports configurable delimiters, optional header rows, and append mode. Uses spawn_blocking to avoid blocking the async runtime during file I/O.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["sink-csv"] }
Quick Start
use ;
use Sink;
use json;
async
Configuration
| Field | Type | Default | Description |
|---|---|---|---|
path |
String |
(required) | Path to the output CSV file |
delimiter |
u8 |
b',' (comma) |
Field delimiter byte |
write_headers |
bool |
true |
Whether to write a header row with column names |
append |
bool |
false |
Whether to append to an existing file. When false, the file is truncated on open. |
batch_size |
usize |
1000 |
Records per upstream StreamPage. No behavioural impact at this sink — present for symmetry. See Streaming and batching. |
Streaming and batching
This sink writes rows to the output CSV file one at a time via a buffered csv::Writer running inside tokio::task::spawn_blocking. The per-page memory bound for the pipeline is set by the source's batch_size (the size of each StreamPage that Pipeline::run hands to Sink::write_batch); how that page is then iterated record-by-record on the sink side is what determines on-disk output, and that path does not depend on batch_size at all.
batch_size is exposed on this config purely for symmetry across every sink in the workspace — sinks like faucet-sink-postgres or faucet-sink-bigquery use the field to size their multi-row inserts / streaming-insert requests, but a per-record file sink has nothing to tune. batch_size = 0 (the "no batching" sentinel) and any positive value are observably identical for this sink: both produce byte-for-byte the same .csv file.
Builder Methods
use CsvSinkConfig;
let config = new
.delimiter
.write_headers
.append;
Column Order and Missing Fields
- Column order is the union of keys across all records in the first
write_batch()call, in first-seen order — so a field present only in a later record of that batch is still captured (audit #146 H2). - All subsequent records use the same column order, regardless of their key order.
- If a record is missing a field that exists in the column list, an empty string is written for that cell.
- Keys that appear only in a later
write_batch()call (after the header is written) are still ignored — the header is fixed once on the first call.
Value Conversion
| JSON Type | CSV Output |
|---|---|
null |
empty string |
string |
the string value |
number |
string representation |
boolean |
"true" or "false" |
object / array |
JSON serialization (e.g. {"nested":true}) |
Config Loading
use ;
use CsvSinkConfig;
// From a JSON file
let config: CsvSinkConfig = load_json?;
// From an .env file with a prefix
let config: CsvSinkConfig = load_env_file?;
Example JSON config
Note: The delimiter is a byte value (44 = comma, 9 = tab, 124 = pipe |).
Example JSON config (TSV)
Example .env file
CSV_SINK_PATH=/data/exports/users.csv
CSV_SINK_DELIMITER=44
CSV_SINK_WRITE_HEADERS=true
CSV_SINK_APPEND=false
Config Schema Introspection
use Sink;
let sink = new;
let schema = sink.config_schema;
println!;
Pipeline Usage
use Pipeline;
use ;
use ;
let source = new;
let sink = new;
let result = new.run.await?;
println!;
Examples
Basic CSV export
let config = new;
let sink = new;
let records = vec!;
sink.write_batch.await?;
sink.flush.await?;
Output (/tmp/users.csv):
id,name,email
1,Alice,alice@example.com
2,Bob,bob@example.com
Tab-separated output (TSV)
let config = new
.delimiter;
let sink = new;
sink.write_batch.await?;
sink.flush.await?;
Append mode for incremental exports
// First run: creates the file with headers
let sink = new;
sink.write_batch.await?;
sink.flush.await?;
drop;
// Second run: appends without headers
let sink = new;
sink.write_batch.await?;
sink.flush.await?;
Without header row
let config = new
.write_headers;
let sink = new;
sink.write_batch.await?;
sink.flush.await?;
How It Works
- The file is opened lazily on the first
write_batch()call. Column order is the union of keys across the records of the firstwrite_batch()call. - Missing parent directories of
pathare created automatically (equivalent tomkdir -p) before the file is opened, matching the behaviour of the parquet sink. Dated-subdirectory paths such as./data/dt=2026-03-08/part.csvwork without any pre-creation step. - All CSV I/O runs inside
tokio::task::spawn_blockingto avoid blocking the async runtime. - A
Mutexprotects the writer state (column order + csv::Writer) for thread-safe access. - Headers are written only on file creation (not in append mode) when
write_headersistrue. - Multiple
write_batch()calls accumulate rows in the same file using the same column order. - Call
flush()to ensure all buffered data is written to disk before dropping the sink or reading the file.
Compression
Behind the crate-local compression Cargo feature. Adds a compression config
field with values none, gzip, zstd, or auto (the default — detects
.gz / .zst from the file path / object key).
YAML example:
kind: csv
config:
# ... existing fields ...
compression: auto # or 'gzip' | 'zstd' | 'none'
flush() explicitly finalises the compression encoder (writing the gzip/zstd trailer) and propagates any trailer-write I/O error, rather than swallowing it on drop — so a bookmark never advances over a truncated/corrupt file. Subsequent writes append a fresh member. Headers are emitted only on the first open.
License
Licensed under MIT or Apache-2.0.