Expand description
§flusso-engine
The flusso sync engine — drives changes from a source through to a sink, exactly the once they’re durable.
ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack§At a glance
| Edge (trait object) | Job | Swap to… |
|---|---|---|
ChangeCapture | stream + snapshot the source | WAL, polling, … |
queue | bounded buffer, back-pressures capture | channel, durable broker |
DocumentBuilder | resolve a row → document ids, assemble each | per source |
Sink | buffer, flush, ack, report seeding | stdout, OpenSearch |
| Invariant | What it buys you | Guarded by |
|---|---|---|
| At-least-once — acks confirmed only after the flush that persisted their docs | crash before flush → whole batch redelivered, re-applied idempotently | flush-then-confirm |
| Two-step resolve → build, deduped | a doc touched N times in a batch is built once | dedup per batch |
| Backfill is the sink’s call | the destination decides what needs seeding, not the source | is_seeded per index |
| Item rejections vs flush errors | one poison doc doesn’t have to stop the run | FailurePolicies |
§The loop
A capture task drains the source’s change stream into a bounded
in-process queue (back-pressure: capture blocks when the
queue is full). A worker pulls changes and, for the row each names,
resolves the affected document ids, assembles each one, and writes it to the
Sink’s buffer.
§Batching
Writes are batched: the worker groups up to BatchPolicy::max_changes
changes (or whatever arrives within BatchPolicy::max_delay, whichever
comes first) into a single flush, turning N changes into
⌈N / max_changes⌉ bulk round-trips instead of N.
§At-least-once
The source acks for a batch are confirmed only after the flush that persisted their documents, so the replication slot advances past a change exactly when its documents are durable downstream. A crash before the flush leaves the whole batch unconfirmed, so it’s redelivered on restart and re-applied idempotently — documents are rebuilt from the current row and written by deterministic id.
Stopping on any error is therefore safe: unconfirmed changes are redelivered when the run restarts.
§Mapping first
Before anything else, the engine asks the DocumentBuilder for each
index’s resolved mapping and tells the sink to create it
(ensure_index) — so the destination uses the
configured field types instead of guessing. This is idempotent, so it runs
on every start, including resumes.
§Backfill
Before live capture, the engine runs an optional backfill phase. It asks
the DocumentBuilder which indexes exist and the sink whether each is
already seeded; for those that aren’t, it asks the source to
snapshot their root tables and drives that
finite stream through the same queue → resolve → build → sink path (scoped to
just the unseeded indexes), then records each as seeded. So “is a backfill
needed?” is the destination’s call, not the source’s.
💡 Did you know — the queue, source, sink, and document builder are all trait objects, so the backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable broker) swap without touching this loop.
§Module map
| Module | Holds |
|---|---|
policy | run configuration — BatchPolicy, FailurePolicies |
pipeline | the Pipeline execution machinery this Engine drives |
observer | the progress trait |
error | the error type |
Structs§
- Batch
Policy - How the worker groups changes into one sink flush.
- Batch
Stats - What one committed batch did — reported to
Observer::on_batch_committed. - Engine
- Drives changes from a source through to a sink.
- Failure
Policies - How the engine resolves the
FailurePolicyfor a rejected document: a globaldefaultplus per-index overrides, keyed by logical index name. - FanOut
- An
Observerthat forwards every event to several observers in turn. - Noop
Observer - The default
Observer: every event is dropped. Used when an engine is run withoutwith_observer.
Enums§
- Engine
Error - A fatal error that stops the sync run. Because the engine confirms a change’s source ack only after the change is durably applied, stopping on an error is safe: unconfirmed changes are redelivered when the run restarts.
- Failure
Policy - What the pipeline does when a sink rejects a document at the item level —
it accepted the batch but refused a specific document (a mapping conflict, a
malformed value). Distinct from a flush-wide failure, which always stops the
run. Set globally on the config and overridable per index (both live in the
schemacrate’sConfig/Index, which assemble this policy).
Traits§
- Observer
- A sink for the engine’s lifecycle and progress events.
Type Aliases§
- Result
- Result alias for the engine.