faucet-source-csv 1.0.0

CSV file source connector for the faucet-stream ecosystem
Documentation

faucet-source-csv

Crates.io Docs.rs

A CSV file source that reads rows from CSV files and returns them as JSON objects, with configurable delimiters, headers, and quote characters.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-csv = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-csv"] }

Quick Start

use faucet_source_csv::{CsvSource, CsvSourceConfig};
use faucet_core::Source;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = CsvSourceConfig::new("/path/to/data.csv");

    let source = CsvSource::new(config);
    let records = source.fetch_all().await?;

    for record in &records {
        println!("{}", record);
    }
    Ok(())
}

How It Works

  • If the file has headers, each row becomes a JSON object with header names as keys
  • If the file has no headers, keys are generated as column_0, column_1, etc.
  • All field values are returned as JSON strings (no type inference)
  • fetch_all / fetch_with_context read the file via blocking I/O on a spawn_blocking task to avoid starving the async runtime
  • Source::stream_pages reads the file via async line-streaming on a tokio BufReader and parses each line through a single-record csv::ReaderBuilder parse

Configuration

CsvSourceConfig

Field Type Default Description
path String (required) Path to the CSV file
has_headers bool true Whether the file has a header row
delimiter u8 b',' (comma) Field delimiter byte
quote u8 b'"' (double quote) Quote character byte
batch_size usize DEFAULT_BATCH_SIZE (1000) Rows per emitted StreamPage in Source::stream_pages. 0 is the "no batching" sentinel — emits all rows in a single page

Streaming and batching

CsvSource::stream_pages is a true client-side stream: it opens the file via tokio::fs::File + tokio::io::BufReader, reads the header line first (if has_headers), then iterates the remaining lines via AsyncBufReadExt::lines. Each line is parsed through a single-record csv::ReaderBuilder so quoted fields containing the delimiter (e.g. "hello, world") parse correctly. There is no server-side concern — the file is consumed lazily from the local filesystem, so client-side memory is bounded at O(batch_size) regardless of file size.

batch_size = 0 is the "no batching" sentinel: the file is fully drained and emitted as one page. Useful for small lookup tables or for sinks (SQL COPY, BigQuery load jobs) that prefer one large request to many small ones.

Multi-line quoted records

Parsing uses csv-async, a streaming RFC-4180 reader that tracks quote state across physical lines. Quoted fields containing embedded newlines (and embedded delimiters) are parsed correctly as a single record, so a file produced by faucet-sink-csv round-trips back losslessly through both fetch_all and the stream_pages streaming path.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_csv::CsvSourceConfig;

let config: CsvSourceConfig = load_json("config.json")?;
let config: CsvSourceConfig = load_env_file(".env", "CSV_SOURCE")?;

Example JSON config

{
  "path": "/data/exports/customers.csv",
  "has_headers": true,
  "delimiter": 44,
  "quote": 34
}

Note: delimiter and quote are specified as byte values (44 = comma, 34 = double quote, 9 = tab).

Example .env file

CSV_SOURCE_PATH=/data/exports/customers.csv
CSV_SOURCE_HAS_HEADERS=true
CSV_SOURCE_DELIMITER=44
CSV_SOURCE_QUOTE=34

Config Schema Introspection

use faucet_core::Source;

let source = CsvSource::new(config);
let schema = source.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Reading a standard CSV file

use faucet_source_csv::{CsvSource, CsvSourceConfig};
use faucet_core::Source;

let config = CsvSourceConfig::new("/data/users.csv");
let source = CsvSource::new(config);
let records = source.fetch_all().await?;

// Example record: {"id": "1", "name": "Alice", "email": "alice@example.com"}
for record in &records {
    println!("User: {}", record["name"]);
}

Reading a TSV (tab-separated) file

use faucet_source_csv::{CsvSource, CsvSourceConfig};
use faucet_core::Source;

let config = CsvSourceConfig::new("/data/export.tsv")
    .delimiter(b'\t');

let source = CsvSource::new(config);
let records = source.fetch_all().await?;

Reading a file without headers

use faucet_source_csv::{CsvSource, CsvSourceConfig};
use faucet_core::Source;

let config = CsvSourceConfig::new("/data/raw_data.csv")
    .has_headers(false);

let source = CsvSource::new(config);
let records = source.fetch_all().await?;

// Keys are generated: column_0, column_1, column_2, ...
println!("First field: {}", records[0]["column_0"]);

Pipe-delimited file with single-quote quoting

use faucet_source_csv::{CsvSource, CsvSourceConfig};
use faucet_core::Source;

let config = CsvSourceConfig::new("/data/legacy_export.csv")
    .delimiter(b'|')
    .quote(b'\'');

let source = CsvSource::new(config);
let records = source.fetch_all().await?;

Compression

Behind the crate-local compression Cargo feature. Adds a compression config field with values none, gzip, zstd, or auto (the default — detects .gz / .zst from the file path / object key).

YAML example:

kind: csv
config:
  # ... existing fields ...
  compression: auto  # or 'gzip' | 'zstd' | 'none'

Compression is detected from the file path. Multi-line quoted fields (records with embedded newlines inside quotes) are parsed correctly on both the streaming and fetch_all paths, regardless of compression.

License

Licensed under MIT or Apache-2.0.