Skip to main content

Crate engine

Crate engine 

Source
Expand description

§flusso-engine

The flusso sync engine — drives changes from a source through to a sink, exactly the once they’re durable.

ChangeCapture ─▶ queue ─▶ resolve ─▶ build ─▶ Sink ─▶ flush ─▶ ack

§At a glance

Edge (trait object)JobSwap to…
ChangeCapturestream + snapshot the sourceWAL, polling, …
queuebounded buffer, back-pressures capturechannel, durable broker
DocumentBuilderresolve a row → document ids, assemble eachper source
Sinkbuffer, flush, ack, report seedingstdout, OpenSearch
InvariantWhat it buys youGuarded by
At-least-once — acks confirmed only after the flush that persisted their docscrash before flush → whole batch redelivered, re-applied idempotentlyflush-then-confirm
Two-step resolve → build, dedupeda doc touched N times in a batch is built oncededup per batch
Backfill is the sink’s callthe destination decides what needs seeding, not the sourceis_seeded per index
Item rejections vs flush errorsone poison doc doesn’t have to stop the runFailurePolicies

§The loop

A capture task drains the source’s change stream into a bounded in-process queue (back-pressure: capture blocks when the queue is full). A worker pulls changes and, for the row each names, resolves the affected document ids, assembles each one, and writes it to the Sink’s buffer.

§Batching

Writes are batched: the worker groups up to BatchPolicy::max_changes changes (or whatever arrives within BatchPolicy::max_delay, whichever comes first) into a single flush, turning N changes into ⌈N / max_changes⌉ bulk round-trips instead of N.

§At-least-once

The source acks for a batch are confirmed only after the flush that persisted their documents, so the replication slot advances past a change exactly when its documents are durable downstream. A crash before the flush leaves the whole batch unconfirmed, so it’s redelivered on restart and re-applied idempotently — documents are rebuilt from the current row and written by deterministic id.

Stopping on any error is therefore safe: unconfirmed changes are redelivered when the run restarts.

§Mapping first

Before anything else, the engine asks the DocumentBuilder for each index’s resolved mapping and tells the sink to create it (ensure_index) — so the destination uses the configured field types instead of guessing. This is idempotent, so it runs on every start, including resumes.

§Backfill

Before live capture, the engine runs an optional backfill phase. It asks the DocumentBuilder which indexes exist and the sink whether each is already seeded; for those that aren’t, it asks the source to snapshot their root tables and drives that finite stream through the same queue → resolve → build → sink path (scoped to just the unseeded indexes), then records each as seeded. So “is a backfill needed?” is the destination’s call, not the source’s.

💡 Did you know — the queue, source, sink, and document builder are all trait objects, so the backend choices (WAL vs polling, stdout vs OpenSearch, channel vs a durable broker) swap without touching this loop.

§Module map

ModuleHolds
policyrun configuration — BatchPolicy, FailurePolicies
pipelinethe Pipeline execution machinery this Engine drives
observerthe progress trait
errorthe error type

Structs§

BatchPolicy
How the worker groups changes into one sink flush.
BatchStats
What one committed batch did — reported to Observer::on_batch_committed.
Engine
Drives changes from a source through to a sink.
FailurePolicies
How the engine resolves the FailurePolicy for a rejected document: a global default plus per-index overrides, keyed by logical index name.
FanOut
An Observer that forwards every event to several observers in turn.
NoopObserver
The default Observer: every event is dropped. Used when an engine is run without with_observer.

Enums§

EngineError
A fatal error that stops the sync run. Because the engine confirms a change’s source ack only after the change is durably applied, stopping on an error is safe: unconfirmed changes are redelivered when the run restarts.
FailurePolicy
What the pipeline does when a sink rejects a document at the item level — it accepted the batch but refused a specific document (a mapping conflict, a malformed value). Distinct from a flush-wide failure, which always stops the run. Set globally on the config and overridable per index (both live in the schema crate’s Config/Index, which assemble this policy).

Traits§

Observer
A sink for the engine’s lifecycle and progress events.

Type Aliases§

Result
Result alias for the engine.