flusso
[!IMPORTANT]
🤖 Generative AI disclosure
Generative AI was used in this project to produce boilerplate and documentation. Every single line of code has been manually reviewed and revised by a human software developer who can be blamed accordingly.
Keep OpenSearch in sync with Postgres, driven by declarative config.
You write a bit of YAML describing what a search document should look like.
flusso builds the index, seeds it from your existing rows, then tails Postgres'
logical replication stream so the index stays current — no cron job, no nightly
reindex, no hand-rolled for row in rows: es.index(...) script you'll regret at
2am.
In short: you describe the what, flusso handles the keep-it-in-sync.
Contents
- Quickstart — running in about five commands
- What it does — the two files you write
- How the pipeline works — the bit that does the work
- The CLI —
build,check,run justcommands — the shortcuts you'll actually use- Requirements — what Postgres and OpenSearch need first
- Deploying it — Docker image + Helm chart
- Docs — everything else, linked
- Project layout — where the code lives
- Testing & development
Quickstart
The dev/ directory is a complete, runnable example — a docker-compose
stack (Postgres wired for logical replication, OpenSearch, Dashboards,
Prometheus, Grafana), seeded data, and a matching config. With
just installed (cargo install just --locked):
Then, in another terminal, poke it and watch changes stream through:
That's it. Run just on its own to see every recipe. The full walk-through —
resetting state, inspecting the slot, OpenSearch Dashboards on :5601 — lives in
dev/README.md.
No
just? Every recipe is just a thin wrapper; the rawcargo run -- …anddocker compose …commands are right there in thejustfile.
What it does
A deployment is two kinds of files. That's the whole mental model.
flusso.toml — one per deployment. Where the data comes from, where it goes,
and which indexes to build.
[]
= "postgres"
= { = "DATABASE_URL" } # secrets stay as env refs, never baked in
[]
= "opensearch"
= "https://localhost:9200"
= { = "OS_PASSWORD" }
[] # sinks fan out — write the same docs to more than one place
= "stdout"
= true
[[]]
= "users"
= "users.schema.yml"
*.schema.yml — one per index. What a single search document looks like:
its table, its fields, and the related tables that fold in.
table: users
primary_key: id
soft_delete:
column: deleted # users.deleted = true → tombstone instead of upsert
fields:
- keyword: email
transforms:
# Fold each user's recent orders in as a nested array.
- has_many: orders
table: orders
foreign_key: user_id
primary_key: id
order_by:
limit: 5
fields:
- double: total
- keyword: status
# ...or just count them. A count is always a long.
- count: orderCount
table: orders
foreign_key: user_id
Every field declares its type from a fixed set (SCHEMA.md lists
them all) that bridges a Postgres column and an OpenSearch mapping. So a schema is
self-describing: flusso derives the full index mapping — and validates your config
— without ever touching a database.
The neat part: change a user or one of their orders and flusso rebuilds the
whole users document and re-emits it. It figures out which documents a changed
row affects, reassembles each, and writes it by a deterministic id. You don't tell
it what to update; it works it out.
How the pipeline works
The engine wires pluggable edges together and runs the loop:
ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack
A capture task drains the source's change stream into a bounded queue (full queue → capture blocks; back-pressure for free). A worker pulls changes, works out which document ids they touch, builds each document, and writes it. Writes are batched — N changes, or whatever shows up in a short window, flush together as one bulk round-trip.
Delivery is at-least-once — not exactly-once, because exactly-once is mostly a story distributed systems tell at conferences. A change's ack is confirmed only after the flush that made its document durable, so the replication slot advances exactly when the data has landed. Crash before the flush? The batch is redelivered on restart and re-applied idempotently — same id, same result, no duplicates.
Before going live, the engine runs an optional backfill: it asks each sink whether an index is already seeded and snapshots the tables for the ones that aren't. Whether a backfill is needed is the destination's call, not the source's.
Queue, source, sink, and document builder are all trait objects — so WAL-vs-polling, stdout-vs-OpenSearch, in-process-channel-vs-durable-broker are all swappable without touching the engine loop. (Today the menu is short: Postgres in, OpenSearch out. The seams are there for when it isn't.)
The CLI
Three subcommands, and every flag also reads a FLUSSO_* env var (the flag
wins when both are set) — handy for containers.
flusso build— compile a config and its schemas into one portableflusso.lock. No database, no secrets baked in ({ env = "VAR" }refs are carried through and resolved wherever it runs). Ship one file instead of a tree of YAML.flusso run— stream changes through the engine. Likecargo run, it compiles first: when aflusso.tomlis present (the default, or--config) it recompiles and rewritesflusso.lock, then runs — so the committed lock stays current for free. With no config it loads the existingflusso.lock, and--lockedruns the lock as-is without recompiling. Credentials are resolved here, in the running environment.flusso check— validate the config and print the fully-typed mapping, with no database. Drop--offlineand it also confirms the declared types match the live database and grumbles about any that don't.
Logging honors RUST_LOG (default info); FLUSSO_LOG_FORMAT=json for structured
logs. Set the standard OTEL_EXPORTER_OTLP_ENDPOINT and traces export there too.
Every environment variable flusso reads — secrets, the FLUSSO_* flags, telemetry
— is collected in one place: CONFIG.md.
just commands
Common workflows are wrapped in the justfile. Run just for the
full menu; the greatest hits:
| Recipe | Does |
|---|---|
just up / just down / just reset |
Start / stop / wipe-and-restart the dev stack |
just check / just check-offline |
Validate config + schemas (with / without a DB) |
just run / just run-live |
Backfill + follow / resume live only |
just build-lock |
Compile a portable flusso.lock |
just demo |
Run flusso inside the cluster — no host toolchain |
just status / just metrics / just eta |
Live status / raw metrics / backlog drain ETA |
just psql / just grafana |
psql shell / open the Grafana dashboard |
just test / just test-all / just doc |
Fast tests / + Postgres e2e / doctests |
just lint / just fmt / just ci |
Lint / format / the full local CI gate |
Requirements
flusso doesn't own Postgres or OpenSearch — it's a guest in both. A few things
have to be true before it can run. The dev/ stack sets all of this up
for you; here's what you'd replicate against your own infrastructure. Full
per-source/per-sink options are in SOURCES_AND_SINKS.md.
Postgres (the source):
- PG 14+ with
wal_level = logical(a restart-required setting), andmax_wal_senders/max_replication_slotshigh enough for flusso plus any other consumers. - A publication covering every table any index reads — root tables and
every table a join or aggregate pulls from. flusso derives that set from your
schema and manages the publication for you when the source role is
privileged enough: it creates it on first connect and extends it as you add
tables, exactly as it does the slot. The catch is privilege — creating or
extending a publication needs table ownership plus
CREATEon the database (or superuser), a stronger grant than the read-only role below. If the role can't, flusso doesn't fail: it logs the exactCREATE PUBLICATION/ALTER PUBLICATION … ADD TABLEto run, andflusso checkprints the same. Set[source] manage_publication = false(or--noviaFLUSSO_MANAGE_PUBLICATION=false) to turn management off and manage the publication yourself. - A replication slot — this one flusso always creates on first connect (it
needs only the
REPLICATIONattribute). Heads-up: Postgres hoards WAL until flusso confirms it, so a flusso that's down for a long time means WAL piling up on the server. Drop the slot when you retire a deployment, unless you're a fan of disk-full pages. - Row identity on every replicated table — a primary key (usual case) or an
explicit
REPLICA IDENTITY. Keyless tables can't be addressed, so flusso skips them in backfill and errors on a live change it can't key. - A role with
REPLICATION+SELECTon the published tables, via the usualpostgres://user:pass@host:port/dbURL. That's enough to stream and to create the slot; for flusso to also manage the publication (above) the role must own those tables and holdCREATEon the database — otherwise flusso just prints the SQL for you to run with a privileged role.
OpenSearch (the sink):
- OpenSearch 2.x (also speaks Elasticsearch 7.x on the query side via
flusso-query). - A reachable HTTP(S) endpoint as the sink
url. Optional HTTP basic auth (username/password);tls_verifydefaults totrue— flip it off only for self-signed dev clusters. - A user that can manage and write flusso's indexes plus the small hidden
flusso_metaindex where it records "this one's seeded". flusso owns the index lifecycle: it derives a strict typed mapping per schema and names each index from a hash of that schema (users_<hash>), so a structural change rolls onto a fresh index and re-seeds instead of fighting a mismatched one. The plain logical name (users) is kept as an alias on the current index, so you can always query it without knowing the hash.
Deploying it
- Container image — the
Dockerfilebuilds a registry-ready, config-less image (you mount a config or bake your ownflusso.lock). It also has ademotarget with the dev config baked in, which is whatjust demoruns.DEPLOY.mdhas the recipes for shipping the smallest possible image — bake your own lock, or compile one in-Docker even when your schemas are scattered across a monorepo. - Kubernetes — the Helm chart deploys flusso as a
single instance (it consumes one replication slot, so it's firmly a party of
one) with config via ConfigMap, secrets via env, a Service, and an optional
Prometheus
ServiceMonitor. See its README.
Docs
| Doc | What's in it |
|---|---|
SCHEMA.md |
Every config + schema key: field types, joins, aggregates, filters, validation rules |
SOURCES_AND_SINKS.md |
Every source and sink option — batch sizes, retries, analysis modes |
CONFIG.md |
Every environment variable in one place — secrets, FLUSSO_* flags, logging & telemetry |
CLIENT.md |
flusso-query, the typed query-side client and its #[derive(FlussoDocument)] |
DEPLOY.md |
Docker recipes — smallest image, baking/compiling a flusso.lock, scoped .dockerignore |
dev/README.md |
The dev stack walk-through |
deploy/helm/flusso/README.md |
The Helm chart |
CLAUDE.md |
Architecture + contributor notes (also where the AI takes its instructions) |
Project layout
Crates live under libs/ and apps/. The numeric prefix is the dependency
layer — a crate only depends on lower-numbered ones.
| Crate | Path | Role |
|---|---|---|
schema-core |
libs/0-core |
The validated domain model every other crate produces and consumes — the sole layer-0 crate. |
schema-config-toml |
libs/2-schema/1-config-toml |
Parses flusso.toml → neutral entities. |
schema-index-yaml |
libs/2-schema/1-index-yaml |
Parses *.schema.yml → core types. |
schema |
libs/2-schema |
Config entry point. load() reads a config + its schemas into one validated Config. |
queue-core |
libs/1-queue/0-core |
The work-queue abstraction, generic over the payload. |
queue-channel |
libs/1-queue/1-channel |
In-process queue over a bounded tokio mpsc channel. |
sources-core |
libs/1-sources/0-core |
Source abstractions: cdc (what changed?) and document (what to build?). |
sources-postgres |
libs/1-sources/1-postgres |
Postgres source: WAL capture, backfill, document building. |
sinks-core |
libs/1-sinks/0-core |
The Sink trait, JSON rendering, and a fan-out sink. |
sinks-stdout |
libs/1-sinks/1-stdout |
Writes each operation to stdout (NDJSON or pretty). |
sinks-opensearch |
libs/1-sinks/2-opensearch |
OpenSearch sink: bulk API, typed mappings, hashed index names + latest-alias, seeding markers. |
engine |
libs/2-engine |
The sync engine — capture → queue → build → sink, batched, at-least-once. |
daemon |
libs/3-daemon |
Wires a Config into a running pipeline and exposes live Status. |
flusso-cli |
apps/cli |
The flusso binary: transport, telemetry, signals. |
Each config-format crate works in two stages — parse (serde → permissive
entity types, unknown fields rejected) then convert (lift into schema-core,
apply the rules the format can't express). Secrets are not resolved here, so a
compiled config never carries a secret it wasn't literally given.
Testing & development
Tests run with cargo-nextest
(cargo install cargo-nextest --locked):
The e2e tests are #[ignore]d by default and legitimately slow/flaky, so
.config/nextest.toml caps their concurrency and retries them. A few crates also
carry Criterion benchmarks
(cargo bench) — the engine, the OpenSearch sink, and the Postgres source.
Editor support: point your editor at
libs/2-schema/1-config-toml/config.schema.json and
libs/2-schema/1-index-yaml/index.schema.yml for completion and inline
validation — the bundled example schemas already reference them via a
yaml-language-server modeline.
Want to hack on it? CLAUDE.md has the architecture tour and the
house rules (the lints are strict — no unwrap, no println!, and yes, they fail
the build).
License
Licensed under the Apache License, Version 2.0. Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this work shall be licensed as above, without any additional terms or conditions.