faucet-transform-sql 1.0.0

SQL-as-transform for faucet-stream — run DuckDB SQL over each pipeline page (the `batch` relation).
Documentation

faucet-transform-sql

SQL-as-transform for faucet-stream — run DuckDB SQL over each pipeline page. The page's records are exposed as the relation batch; the result set replaces the page.

[dependencies]
faucet-transform-sql = "1.0"

Enable in the umbrella crate or CLI:

faucet-stream = { version = "1.0", features = ["transform-sql"] }

What it does

Every pipeline page passes through the SQL query as a temporary in-memory DuckDB table named batch. The result set of the query becomes the new page — each result row becomes one output JSON record. Column name → JSON key; NULL → JSON null; STRUCT/LIST/MAP → nested JSON.

This makes it straightforward to filter, reshape, aggregate, and join inline data without a separate warehouse step.

Config

- type: sql
  config:
    query: "SELECT id, upper(name) AS name FROM batch WHERE active"

Wire shape: { type: sql, config: { query, relations?, memory_limit?, threads? } }.

Field reference

Field Type Required Description
query string yes The SQL statement. batch is the page's records as a table.
relations list no Reference relations loaded at compile time and joinable by name.
memory_limit string no DuckDB memory_limit pragma (e.g. "1GB"). Default: DuckDB's own.
threads integer no DuckDB threads pragma. Default: DuckDB's own. Set to 1–2 in high-fan-out matrices to avoid CPU over-subscription.

Reference relations

Pre-load static lookup data (CSV, JSONL, or inline values) that your query can JOIN against. Relations are loaded once at compile time (when faucet validate / faucet run first reads the config) and remain resident for the lifetime of the transform.

- type: sql
  config:
    query: |
      SELECT b.id, c.country
      FROM batch b
      LEFT JOIN countries c ON b.code = c.code
    relations:
      - name: countries
        source:
          type: csv
          path: data/countries.csv
          has_header: true        # default true

Relation source types

type Fields Description
csv path (string, required), has_header (bool, default true) Loaded via DuckDB read_csv_auto.
jsonl path (string, required) Loaded via DuckDB read_json_auto.
values columns (list of strings), rows (list of lists) Inline data; no file I/O.

Inline values example:

relations:
  - name: tiers
    source:
      type: values
      columns: [id, label]
      rows:
        - [1, gold]
        - [2, silver]
        - [3, bronze]

reload_on_change

relations:
  - name: prices
    source:
      type: csv
      path: data/prices.csv
    reload_on_change: true   # re-read when the file's mtime changes

When true, faucet stats the file before each page and rebuilds the relation atomically if the mtime changed. Defaults to false. Ignored for values.

Reserved name

The name batch is reserved for the page relation. Using it as a relation name is a compile-time error.

Per-page semantics and batch_size: 0

This is the most important thing to understand about the SQL transform.

The transform runs once per page, not once across the whole stream. With the default batch_size of 1000, GROUP BY and window functions aggregate within a single 1000-row page — not across all pages.

# BAD: GROUP BY runs per-page, giving partial aggregates.
pipeline:
  source:
    type: csv
    config:
      path: data/orders.csv
  transforms:
    - type: sql
      config:
        query: "SELECT country, SUM(amount) AS total FROM batch GROUP BY country"

To aggregate across the whole dataset, set the source's batch_size: 0 so the entire result set arrives as one page:

# CORRECT: batch_size: 0 loads the whole file as one page → global GROUP BY.
pipeline:
  source:
    type: csv
    config:
      path: data/orders.csv
      batch_size: 0
  transforms:
    - type: sql
      config:
        query: "SELECT country, SUM(amount) AS total FROM batch GROUP BY country"

batch_size: 0 means "no batching" — the source emits the entire result set as a single StreamPage. All sources support it; it is appropriate for small lookup tables and for aggregating transforms like this one.

When an aggregating query receives a second page (i.e. batch_size was not set to 0), faucet emits a one-time warning:

WARN faucet::transform::sql: sql transform with aggregation received multiple pages;
aggregation is per-page — set batch_size: 0 for global aggregation

Error handling and validation

At config load time (faucet validate / start of faucet run):

  • The query is parse/bind-checked inside DuckDB. Syntax errors report line and column number.
  • Reference-relation files that do not exist cause an immediate error (before any page is processed).
  • A relation named batch is rejected.

At runtime (per page):

  • A query that fails mid-run aborts the pipeline immediately (FaucetError::Transform).
  • Runtime query errors are not routed to the dead-letter queue — they follow the same fail-fast policy as every other built-in transform.

Empty result sets are valid: a query that matches zero rows produces zero output records for that page.

Runnable example

The file cli/examples/csv_to_jsonl_sql.yaml groups order CSV data by country and joins to a reference countries CSV, writing JSONL output:

version: 1
name: csv_to_jsonl_sql

pipeline:
  source:
    type: csv
    config:
      path: cli/examples/data/orders.csv
      has_header: true
      batch_size: 0          # whole file as one page → global GROUP BY

  transforms:
    - type: sql
      config:
        query: |
          SELECT c.country,
                 COUNT(*)                     AS order_count,
                 SUM(CAST(o.amount AS DOUBLE)) AS total_amount
          FROM   batch o
          LEFT JOIN countries c ON o.country_code = c.code
          GROUP BY c.country
          ORDER BY c.country
        relations:
          - name: countries
            source:
              type: csv
              path: cli/examples/data/countries.csv
              has_header: true

  sink:
    type: jsonl
    config:
      path: /tmp/faucet_sql_demo.jsonl

Run it (requires the transform-sql, source-csv, and sink-jsonl features):

faucet run cli/examples/csv_to_jsonl_sql.yaml

Output rows look like:

{"country":"Germany","order_count":1,"total_amount":3.0}
{"country":"India","order_count":1,"total_amount":7.0}
{"country":"United States","order_count":2,"total_amount":15.5}

Feature flag

transform-sql — not in the default build; included in full.

# Enable only the SQL transform
faucet-stream = { version = "1.0", features = ["transform-sql"] }

# Enable together with specific connectors
faucet-stream = { version = "1.0", features = ["source-csv", "sink-jsonl", "transform-sql"] }

# Enable everything
faucet-stream = { version = "1.0", features = ["full"] }

CLI:

cargo install faucet-cli --features transform-sql

JSON columns

DuckDB's json_extract works on string or JSON columns. If a field is a JSON string, use it directly:

SELECT json_extract(payload, '$.user.id') AS user_id FROM batch

Or cast it first:

SELECT json_extract(payload::JSON, '$.user.id') AS user_id FROM batch

Timestamp / timezone note

DuckDB's TIMESTAMP type is timezone-naive. faucet JSON timestamps are RFC 3339 strings (e.g. "2026-01-01T12:00:00Z"). To compare or cast them:

-- Parse an RFC 3339 string into a DuckDB TIMESTAMP (drops the offset)
SELECT CAST(created_at AS TIMESTAMP) AS ts FROM batch
WHERE CAST(created_at AS TIMESTAMP) > '2026-01-01'::TIMESTAMP

-- Keep the string form, compare lexicographically (safe for UTC-only data)
SELECT * FROM batch WHERE created_at > '2026-01-01T00:00:00Z'

If your timestamps include non-UTC offsets, normalise them to UTC with cast before passing to the SQL transform, or parse with strptime.

Library usage

use faucet_transform_sql::{SqlTransform, SqlTransformConfig};
use faucet_core::stage::{compile_stage, apply_stages_to_page};

let cfg = SqlTransformConfig {
    query: "SELECT id, upper(name) AS name FROM batch".into(),
    relations: vec![],
    memory_limit: None,
    threads: None,
};

let t = SqlTransform::compile(&cfg)?;
let stage = compile_stage(&t.into_page_stage())?;

let output = apply_stages_to_page(records, &[stage])?;

License

Licensed under either of Apache License 2.0 or MIT license at your option.