faucet-state-postgres
PostgreSQL-backed StateStore for the faucet-stream ecosystem. Persists incremental-replication bookmarks across pipeline runs in a single JSONB-backed table, so sources can resume exactly where they left off after a crash, restart, or scheduled invocation.
Install
[]
= "0.2"
= "0.2"
Usage
use Arc;
use ;
use PostgresStateStore;
# async
Storage layout
(
key TEXT PRIMARY KEY,
value JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW
);
ensure_table() runs the CREATE TABLE IF NOT EXISTS for you. If you manage schema separately, skip it.
Operations
get(key)—SELECT value FROM <table> WHERE key = $1. ReturnsNonewhen no row exists.put(key, value)—INSERT ... ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW().delete(key)—DELETE FROM <table> WHERE key = $1. A missing row is not an error.
The pool is configured via connect_with(url, max_connections, table); the default connect(url) uses 5 connections and the faucet_state table.
Configuration
| Option | Default | Description |
|---|---|---|
max_connections |
5 |
Size of the sqlx::PgPool. State writes are cheap; the default is intentionally small. |
table |
faucet_state |
Override the table name. Validated as a Postgres identifier (letters/digits/underscore, ≤ 63 chars). |
Validation
- State keys are validated by [
faucet_core::state::validate_state_key] — ASCII alphanumerics plus_ - : ., max 256 characters. - Table identifiers are validated against the Postgres rules to keep error messages clear and avoid SQL injection through misconfiguration. Identifiers are also double-quoted via
faucet_core::util::quote_ident.
License
Licensed under either of MIT or Apache-2.0 at your option.