flusso-daemon 0.3.1

Daemon that assembles and runs the flusso pipeline from a Config.
Documentation

flusso

[!IMPORTANT]

🤖 Generative AI disclosure

Generative AI was used in this project to produce boilerplate and documentation. Every single line of code has been manually reviewed and revised by a human software developer who can be blamed accordingly.

Keep OpenSearch in sync with Postgres, driven by declarative config.

You write a bit of YAML describing what a search document should look like. flusso builds the index, seeds it from your existing rows, then tails Postgres' logical replication stream so the index stays current — no cron job, no nightly reindex, no hand-rolled for row in rows: es.index(...) script you'll regret at 2am.

In short: you describe the what, flusso handles the keep-it-in-sync.

Contents

Quickstart

The dev/ directory is a complete, runnable example — a docker-compose stack (Postgres wired for logical replication, OpenSearch, Dashboards, Prometheus, Grafana), seeded data, and a matching config. With just installed (cargo install just --locked):

just up        # bring the whole stack up and wait for it to be healthy
just check     # validate the config + schemas against the database
just run       # backfill OpenSearch, then follow live changes (serves /status + /metrics)

Then, in another terminal, poke it and watch changes stream through:

just psql                                            # make some changes
curl -s localhost:9200/users/_search?pretty          # see them land in OpenSearch
just status                                          # live pipeline status

That's it. Run just on its own to see every recipe. The full walk-through — resetting state, inspecting the slot, OpenSearch Dashboards on :5601 — lives in dev/README.md.

No just? Every recipe is just a thin wrapper; the raw cargo run -- … and docker compose … commands are right there in the justfile.

What it does

A deployment is two kinds of files. That's the whole mental model.

flusso.toml — one per deployment. Where the data comes from, where it goes, and which indexes to build.

[source]
type = "postgres"
connection_url = { env = "DATABASE_URL" }   # secrets stay as env refs, never baked in

[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }

[sinks.audit]          # sinks fan out — write the same docs to more than one place
type = "stdout"
pretty = true

[[index]]
name = "users"
schema = "users.schema.yml"

*.schema.yml — one per index. What a single search document looks like: its table, its fields, and the related tables that fold in.

table: users
primary_key: id

soft_delete:
  column: deleted          # users.deleted = true → tombstone instead of upsert

fields:
  - keyword: email
    transforms: [lowercase, trim]

  # Fold each user's recent orders in as a nested array.
  - has_many: orders
    table: orders
    foreign_key: user_id
    primary_key: id
    order_by: [{ column: created_at, direction: desc }]
    limit: 5
    fields:
      - double: total
      - keyword: status

  # ...or just count them. A count is always a long.
  - count: orderCount
    table: orders
    foreign_key: user_id

Every field declares its type from a fixed set (SCHEMA.md lists them all) that bridges a Postgres column and an OpenSearch mapping. So a schema is self-describing: flusso derives the full index mapping — and validates your config — without ever touching a database.

The neat part: change a user or one of their orders and flusso rebuilds the whole users document and re-emits it. It figures out which documents a changed row affects, reassembles each, and writes it by a deterministic id. You don't tell it what to update; it works it out.

How the pipeline works

The engine wires pluggable edges together and runs the loop:

ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack

A capture task drains the source's change stream into a bounded queue (full queue → capture blocks; back-pressure for free). A worker pulls changes, works out which document ids they touch, builds each document, and writes it. Writes are batched — N changes, or whatever shows up in a short window, flush together as one bulk round-trip.

Delivery is at-least-once — not exactly-once, because exactly-once is mostly a story distributed systems tell at conferences. A change's ack is confirmed only after the flush that made its document durable, so the replication slot advances exactly when the data has landed. Crash before the flush? The batch is redelivered on restart and re-applied idempotently — same id, same result, no duplicates.

Before going live, the engine runs an optional backfill: it asks each sink whether an index is already seeded and snapshots the tables for the ones that aren't. Whether a backfill is needed is the destination's call, not the source's.

Queue, source, sink, and document builder are all trait objects — so WAL-vs-polling, stdout-vs-OpenSearch, in-process-channel-vs-durable-broker are all swappable without touching the engine loop. (Today the menu is short: Postgres in, OpenSearch out. The seams are there for when it isn't.)

The CLI

Three subcommands, and every flag also reads a FLUSSO_* env var (the flag wins when both are set) — handy for containers.

  • flusso build — compile a config and its schemas into one portable flusso.lock. No database, no secrets baked in ({ env = "VAR" } refs are carried through and resolved wherever it runs). Ship one file instead of a tree of YAML.
  • flusso run — stream changes through the engine. Like cargo run, it compiles first: when a flusso.toml is present (the default, or --config) it recompiles and rewrites flusso.lock, then runs — so the committed lock stays current for free. With no config it loads the existing flusso.lock, and --locked runs the lock as-is without recompiling. Credentials are resolved here, in the running environment.
  • flusso check — validate the config and print the fully-typed mapping, with no database. Drop --offline and it also confirms the declared types match the live database and grumbles about any that don't.
flusso --help
flusso build  --config flusso.toml -o flusso.lock   # build the portable artifact
flusso check  --config flusso.toml                  # validate (+ check vs database)
flusso check  --config flusso.toml --offline        # validate without a database
flusso run                                          # run the compiled flusso.lock
flusso run    --config flusso.toml                  # compile from source and run
flusso run    --skip-backfill                       # resume live capture only

Logging honors RUST_LOG (default info); FLUSSO_LOG_FORMAT=json for structured logs. Set the standard OTEL_EXPORTER_OTLP_ENDPOINT and traces export there too. Every environment variable flusso reads — secrets, the FLUSSO_* flags, telemetry — is collected in one place: CONFIG.md.

just commands

Common workflows are wrapped in the justfile. Run just for the full menu; the greatest hits:

Recipe Does
just up / just down / just reset Start / stop / wipe-and-restart the dev stack
just check / just check-offline Validate config + schemas (with / without a DB)
just run / just run-live Backfill + follow / resume live only
just build-lock Compile a portable flusso.lock
just demo Run flusso inside the cluster — no host toolchain
just status / just metrics / just eta Live status / raw metrics / backlog drain ETA
just psql / just grafana psql shell / open the Grafana dashboard
just test / just test-all / just doc Fast tests / + Postgres e2e / doctests
just lint / just fmt / just ci Lint / format / the full local CI gate

Requirements

flusso doesn't own Postgres or OpenSearch — it's a guest in both. A few things have to be true before it can run. The dev/ stack sets all of this up for you; here's what you'd replicate against your own infrastructure. Full per-source/per-sink options are in SOURCES_AND_SINKS.md.

Postgres (the source):

  • PG 14+ with wal_level = logical (a restart-required setting), and max_wal_senders / max_replication_slots high enough for flusso plus any other consumers.
  • A publication covering every table any index reads — root tables and every table a join or aggregate pulls from. flusso derives that set from your schema and manages the publication for you when the source role is privileged enough: it creates it on first connect and extends it as you add tables, exactly as it does the slot. The catch is privilege — creating or extending a publication needs table ownership plus CREATE on the database (or superuser), a stronger grant than the read-only role below. If the role can't, flusso doesn't fail: it logs the exact CREATE PUBLICATION / ALTER PUBLICATION … ADD TABLE to run, and flusso check prints the same. Set [source] manage_publication = false (or --no via FLUSSO_MANAGE_PUBLICATION=false) to turn management off and manage the publication yourself.
  • A replication slot — this one flusso always creates on first connect (it needs only the REPLICATION attribute). Heads-up: Postgres hoards WAL until flusso confirms it, so a flusso that's down for a long time means WAL piling up on the server. Drop the slot when you retire a deployment, unless you're a fan of disk-full pages.
  • Row identity on every replicated table — a primary key (usual case) or an explicit REPLICA IDENTITY. Keyless tables can't be addressed, so flusso skips them in backfill and errors on a live change it can't key.
  • A role with REPLICATION + SELECT on the published tables, via the usual postgres://user:pass@host:port/db URL. That's enough to stream and to create the slot; for flusso to also manage the publication (above) the role must own those tables and hold CREATE on the database — otherwise flusso just prints the SQL for you to run with a privileged role.

OpenSearch (the sink):

  • OpenSearch 2.x (also speaks Elasticsearch 7.x on the query side via flusso-query).
  • A reachable HTTP(S) endpoint as the sink url. Optional HTTP basic auth (username / password); tls_verify defaults to true — flip it off only for self-signed dev clusters.
  • A user that can manage and write flusso's indexes plus the small hidden flusso_meta index where it records "this one's seeded". flusso owns the index lifecycle: it derives a strict typed mapping per schema and names each index from a hash of that schema (users_<hash>), so a structural change rolls onto a fresh index and re-seeds instead of fighting a mismatched one. The plain logical name (users) is kept as an alias on the current index, so you can always query it without knowing the hash.

Deploying it

  • Container image — the Dockerfile builds a registry-ready, config-less image (you mount a config or bake your own flusso.lock). It also has a demo target with the dev config baked in, which is what just demo runs. DEPLOY.md has the recipes for shipping the smallest possible image — bake your own lock, or compile one in-Docker even when your schemas are scattered across a monorepo.
  • Kubernetes — the Helm chart deploys flusso as a single instance (it consumes one replication slot, so it's firmly a party of one) with config via ConfigMap, secrets via env, a Service, and an optional Prometheus ServiceMonitor. See its README.

Docs

Doc What's in it
SCHEMA.md Every config + schema key: field types, joins, aggregates, filters, validation rules
SOURCES_AND_SINKS.md Every source and sink option — batch sizes, retries, analysis modes
CONFIG.md Every environment variable in one place — secrets, FLUSSO_* flags, logging & telemetry
CLIENT.md flusso-query, the typed query-side client and its #[derive(FlussoDocument)]
DEPLOY.md Docker recipes — smallest image, baking/compiling a flusso.lock, scoped .dockerignore
dev/README.md The dev stack walk-through
deploy/helm/flusso/README.md The Helm chart
CLAUDE.md Architecture + contributor notes (also where the AI takes its instructions)

Project layout

Crates live under libs/ and apps/. The numeric prefix is the dependency layer — a crate only depends on lower-numbered ones.

Crate Path Role
schema-core libs/0-core The validated domain model every other crate produces and consumes — the sole layer-0 crate.
schema-config-toml libs/2-schema/1-config-toml Parses flusso.toml → neutral entities.
schema-index-yaml libs/2-schema/1-index-yaml Parses *.schema.yml → core types.
schema libs/2-schema Config entry point. load() reads a config + its schemas into one validated Config.
queue-core libs/1-queue/0-core The work-queue abstraction, generic over the payload.
queue-channel libs/1-queue/1-channel In-process queue over a bounded tokio mpsc channel.
sources-core libs/1-sources/0-core Source abstractions: cdc (what changed?) and document (what to build?).
sources-postgres libs/1-sources/1-postgres Postgres source: WAL capture, backfill, document building.
sinks-core libs/1-sinks/0-core The Sink trait, JSON rendering, and a fan-out sink.
sinks-stdout libs/1-sinks/1-stdout Writes each operation to stdout (NDJSON or pretty).
sinks-opensearch libs/1-sinks/2-opensearch OpenSearch sink: bulk API, typed mappings, hashed index names + latest-alias, seeding markers.
engine libs/2-engine The sync engine — capture → queue → build → sink, batched, at-least-once.
daemon libs/3-daemon Wires a Config into a running pipeline and exposes live Status.
flusso-cli apps/cli The flusso binary: transport, telemetry, signals.

Each config-format crate works in two stages — parse (serde → permissive entity types, unknown fields rejected) then convert (lift into schema-core, apply the rules the format can't express). Secrets are not resolved here, so a compiled config never carries a secret it wasn't literally given.

Testing & development

Tests run with cargo-nextest (cargo install cargo-nextest --locked):

just test        # fast: unit + parse/convert, no external deps
just test-all    # + Postgres e2e (needs Docker; spins up containers via testcontainers)
just doc         # doctests — nextest doesn't run these
just ci          # the full local gate: lint → e2e → doctests

The e2e tests are #[ignore]d by default and legitimately slow/flaky, so .config/nextest.toml caps their concurrency and retries them. A few crates also carry Criterion benchmarks (cargo bench) — the engine, the OpenSearch sink, and the Postgres source.

Editor support: point your editor at libs/2-schema/1-config-toml/config.schema.json and libs/2-schema/1-index-yaml/index.schema.yml for completion and inline validation — the bundled example schemas already reference them via a yaml-language-server modeline.

Want to hack on it? CLAUDE.md has the architecture tour and the house rules (the lints are strict — no unwrap, no println!, and yes, they fail the build).

License

Licensed under the Apache License, Version 2.0. Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this work shall be licensed as above, without any additional terms or conditions.