Rillflow
Rillflow is a lightweight document and event store for Rust applications, backed by PostgreSQL. It provides a JSONB document store, append-only event streams with optimistic concurrency, projection scaffolding, and developer tracing breadcrumbs that can export to Mermaid diagrams.
Quickstart
Integration Tests (requires Docker)
CLI
- Schema
# Tenants (schema-per-tenant)
- Projections admin
# run until idle (all or one)
# DLQ admin
# metrics
# run loop (long-running)
# metrics endpoint (served on the same bind as health)
|
- Streams
- Documents admin
# read JSON
# soft-delete / restore
# index advisor (DDL suggestions)
- Snapshots
Tenants health check
# Drift detection across all known tenants
# Single tenant check
Feature flag: the CLI is gated behind the cli feature. Enable it when building/running:
Features
- JSONB document store with optimistic versioning
- Document session unit-of-work with identity map + staged writes
- LINQ-like document query DSL (filters, sorting, paging, projections)
- Composable compiled queries for cached predicates and reuse
- Event streams with expected-version checks
- Envelopes: headers, causation_id, correlation_id, created_at (API: read envelopes, append with headers)
- Projection replay and checkpointing helpers
- Projection runtime (daemon-ready primitives): per-projection checkpoints, leases, DLQ; CLI admin
- Developer tracing breadcrumbs with Mermaid export (dev-only)
- Integration test harness using Testcontainers (Docker required)
Store builder
use Duration;
let store = builder
.max_connections
.connect_timeout
.build
.await?;
Append with options (headers/causation/correlation)
use ;
use json;
let opts = AppendOptions ;
store
.events
.append_with
.await?;
Idempotent appends:
store
.events
.builder
.idempotency_key
.push
.send
.await?;
// a second call with the same idempotency key will return Error::IdempotencyConflict
Subscriptions (polling)
Create a subscription with filters, then tail events.
Programmatic:
use ;
let subs = new_with_schema;
let filter = SubscriptionFilter ;
let opts = SubscriptionOptions ;
let = subs.subscribe.await?;
while let Some = rx.recv.await
Consumer groups (checkpoint + leasing per group):
use ;
let subs = new_with_schema;
let filter = SubscriptionFilter ;
let mut opts = SubscriptionOptions ;
opts.group = Some;
let = subs.subscribe.await?;
CLI tail with group:
# group admin
# outputs include last_seq, head and lag per group for quick capacity checks
# tune backpressure per group
Manual ack mode (explicit checkpointing):
use ;
let subs = new_with_schema;
let filter = SubscriptionFilter ;
let mut opts = SubscriptionOptions ;
opts.ack_mode = Manual; // disable auto checkpointing
let = subs.subscribe.await?;
while let Some = rx.recv.await
Transactional ack (exactly-once-ish):
use ;
use ;
let mut opts = SubscriptionOptions ;
opts.ack_mode = Manual; // we will ack inside our DB tx
let = subs.subscribe.await?;
while let Some = rx.recv.await
Aggregates
Fold streams into domain state with a simple trait and repository.
use ;
let repo = new;
let id = new_v4;
repo.commit.await?;
let agg: Counter = repo.load.await?;
Stream Aliases
Resolve a human-friendly alias to a Uuid, creating it on first use.
let id = store.resolve_stream_alias.await?;
store
.events
.append_stream
.await?;
Append builder and validator hook:
use json;
// Fluent append with headers/ids and batching
store
.events
.builder
.headers
.push
.expected
.send
.await?;
// Optional pre-commit validator (receives aggregate state as JSON)
let repo = new
.with_validator;
Snapshots
Persist aggregate state every N events to speed up loads.
// write snapshot every 100 events
repo.commit_and_snapshot.await?;
// fast load using snapshot + tail
let agg: Counter = repo.load_with_snapshot.await?;
Programmatic snapshotter (background compaction):
use Arc;
use ;
// For an aggregate `Counter` implementing Aggregate + Serialize
let repo = new;
let folder = new;
let snap = new;
snap.run_until_idle.await?;
Document Queries
use ;
async
Document Repository (OCC and soft delete)
let id = new_v4;
// put returns new version (starts at 1)
let v1 = store.docs.put.await?;
// get returns (doc, version)
let = store.docs..await?.unwrap;
// update with optimistic concurrency
let v2 = store.docs..await?;
// soft delete / restore (programmatic or via CLI)
query.bind.execute.await?;
query.bind.execute.await?;
Partial updates (jsonb_set):
// set a single field
store.docs.patch.await?;
// set multiple fields in one statement
store.docs.patch_fields.await?;
See MIGRATIONS.md for guidance on adding workload-specific JSONB indexes for query performance.
Projections Runtime (daemon primitives)
Minimal runtime to process events into read models with leases, backoff, DLQ and admin CLI.
Programmatic usage:
use Arc;
use ;
use ;
use ProjectionHandler;
;
async
See runnable example: examples/projection_run_once.rs.
Builder usage and idle runner:
use Arc;
use ;
let daemon = builder
.schema
.batch_size
.register
.build;
daemon.run_until_idle.await?;
// Long-running loop with graceful shutdown and optional NOTIFY wakeups
use ;
let stop = new;
let stop2 = stop.clone;
spawn;
daemon.run_loop.await?; // true = use LISTEN/NOTIFY (channel defaults to rillflow_events)
Advisory locks (optional) for append:
store
.events
.with_advisory_locks
.append_stream
.await?;
Choosing your defaults
- Single-tenant vs multi-tenant: use
SchemaConfig::single_tenant()forpublic, orTenancyMode::SchemaPerTenantto create per-tenant schemas via the CLI/API. - Projection schema: set
ProjectionDaemon::builder(...).schema("app")if you don’t usepublic. - Advisory locks: keep projection lease locks on (default). Enable append advisory locks only if you see writer contention on streams.
- Indexes: start with the default GIN on
doc, then add expression indexes for hot paths (emails, timestamps, numeric ranges). SeeMIGRATIONS.mdfor examples. - Examples: see
examples/projection_run_once.rsfor a runnable projection demo end-to-end.
Compiled Queries
use ;
;
async
Document Sessions & Aggregates
use ;
use Uuid;
let store = builder
.session_defaults
.session_advisory_locks
.build
.await?;
// Store is cheap to clone and share. Configure defaults up front so every clone
// produces sessions with consistent metadata.
let mut session = store.session;
let customer = Customer ;
session.store?;
session.enqueue_event?;
let repo = new;
let mut aggregates = session.aggregates;
aggregates.commit?;
aggregates.commit_and_snapshot?;
session.save_changes.await?;
Multi-Tenancy (Schema Per Tenant)
use ;
let store = builder
.tenant_strategy
.build
.await?;
store.ensure_tenant.await?;
store.ensure_tenant.await?;
// ensure_tenant() is idempotent and guarded by a Postgres advisory lock so
// concurrent app instances won't double-run migrations. Once a tenant is
// provisioned it is cached in-process for fast session spins.
let mut acme = store.session;
acme.context_mut.tenant = Some;
acme.store?;
acme.save_changes.await?;
let mut globex = store.session;
globex.context_mut.tenant = Some;
assert!;
// If you forget to provision a tenant before calling save_changes(), the
// session will fail fast with rillflow::Error::TenantNotFound("tenant_acme").
Upgrading from single-tenant to schema-per-tenant
-
Prerequisites
- Ensure the base schema (
publicby default) is fully migrated (schema-sync). - Take a database backup.
- Identify the tenant identifiers you plan to introduce (e.g.
acme,globex).
- Ensure the base schema (
-
Run schema provisioning
- Create the per-tenant schemas using the CLI or API:
- Create the per-tenant schemas using the CLI or API:
-
Backfill data
- For each tenant, copy existing documents/events into the new schema (custom SQL/backfill job).
- Use the snapshots/export helpers if you plan to archive or migrate historical tenants.
-
Validate drift
- Run the health command to make sure each tenant schema matches the latest migrations:
- Fix any drift before enabling the new strategy.
- Run the health command to make sure each tenant schema matches the latest migrations:
-
Enable schema-per-tenant
- Update your
Store::builderto call.tenant_strategy(TenantStrategy::SchemaPerTenant)and provide a resolver (per-request tenant lookup). - Restart application nodes; watch logs for
tenant requirederrors.
- Update your
-
Post-migration checks
- Verify metrics/traces include tenant labels.
- Run smoke tests against each tenant (documents/events/projections).
- Optionally remove the legacy single-tenant data once confirmed.
License
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT license (LICENSE-MIT)
at your option.