pub struct StateStore { /* private fields */ }Expand description
Entry point for all persistent state. Supports two backends:
- SQLite (default) — a single
.rivet_state.dbfile next to the config. Good for local / single-node / dev deployments. - PostgreSQL — a shared database addressed by
RIVET_STATE_URL. Required for stateless container / Kubernetes deployments where the rivet pod is ephemeral or replicated.
Set the RIVET_STATE_URL environment variable to a PostgreSQL URL to
activate the Postgres backend:
RIVET_STATE_URL=postgresql://user:pass@host:5432/rivet_stateWhen the variable is absent or does not start with postgres, SQLite is
used and the variable is ignored.
Implementations§
Source§impl StateStore
Chunk checkpoint store — reads and writes chunk_run and chunk_task.
impl StateStore
Chunk checkpoint store — reads and writes chunk_run and chunk_task.
Governs Invariant I5 (Chunk Task Acyclicity): transitions are strictly
forward (pending → running → completed | failed). Completed tasks are
never re-claimed. Failed tasks return to running only while
attempts < max_chunk_attempts.
Some methods accept an explicit state_ref and open a fresh connection.
This is required for parallel workers that cannot share one Connection.
Sourcepub fn list_export_names_with_in_progress_chunk_runs(
&self,
) -> Result<Vec<String>>
pub fn list_export_names_with_in_progress_chunk_runs( &self, ) -> Result<Vec<String>>
Distinct export_name values that currently have at least one
chunk_run row with status = 'in_progress' (interrupted run — resume or reset).
Sourcepub fn find_in_progress_chunk_run(
&self,
export_name: &str,
) -> Result<Option<(String, String)>>
pub fn find_in_progress_chunk_run( &self, export_name: &str, ) -> Result<Option<(String, String)>>
Latest in_progress chunk run for this export, if any.
pub fn create_chunk_run( &self, run_id: &str, export_name: &str, plan_hash: &str, max_chunk_attempts: u32, ) -> Result<()>
pub fn insert_chunk_tasks( &self, run_id: &str, ranges: &[(i64, i64)], ) -> Result<()>
Sourcepub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize>
pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize>
Mark tasks left running after a crash as pending so they can be retried.
Sourcepub fn claim_next_chunk_task(
&self,
run_id: &str,
) -> Result<Option<(i64, String, String)>>
pub fn claim_next_chunk_task( &self, run_id: &str, ) -> Result<Option<(i64, String, String)>>
Atomically claim the next pending or retryable failed chunk.
Sourcepub fn claim_next_chunk_task_at_ref(
state_ref: &StateRef,
run_id: &str,
) -> Result<Option<(i64, String, String)>>
pub fn claim_next_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, ) -> Result<Option<(i64, String, String)>>
Claim next chunk using a fresh connection identified by state_ref.
Used by parallel workers that cannot share a single connection.
pub fn complete_chunk_task( &self, run_id: &str, chunk_index: i64, rows_written: i64, file_name: Option<&str>, ) -> Result<()>
pub fn fail_chunk_task( &self, run_id: &str, chunk_index: i64, err: &str, ) -> Result<()>
pub fn fail_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, chunk_index: i64, err: &str, ) -> Result<()>
pub fn complete_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, chunk_index: i64, rows_written: i64, file_name: Option<&str>, ) -> Result<()>
pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize>
pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64>
pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()>
Sourcepub fn reset_chunk_task_for_re_export(
&self,
run_id: &str,
chunk_index: i64,
reason: &str,
) -> Result<usize>
pub fn reset_chunk_task_for_re_export( &self, run_id: &str, chunk_index: i64, reason: &str, ) -> Result<usize>
Reset a single completed chunk task back to pending so the next
--resume run re-exports it.
Used by ADR-0012 M8 reconciliation: when the destination’s manifest
says a part was committed but the actual object is missing or has
drifted (size mismatch), the chunk_task that produced it must run
again. Without this reset, claim_next_chunk_task would skip the
completed row and the destination would stay broken across resumes.
Distinct from reset_chunk_checkpoint(export_name) (which wipes
every run+task for an export — the operator-facing “abandon resume”
nuke) and from reset_stale_running_chunk_tasks (which only
rescues tasks left in ‘running’ after a crash). This one is
surgical: a single (run_id, chunk_index) goes from completed
back to pending, attempts reset to 0, file_name cleared, and
last_error annotated with the M8 reason so the journal/audit
trail records why.
Returns the number of rows updated (0 or 1). Idempotent — calling it on a non-completed task is a no-op.
Sourcepub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize>
pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize>
Remove all chunk runs and tasks for an export (abandon resume).
Sourcepub fn get_latest_chunk_run(
&self,
export_name: &str,
) -> Result<Option<(String, String, String, String)>>
pub fn get_latest_chunk_run( &self, export_name: &str, ) -> Result<Option<(String, String, String, String)>>
Latest chunk_run row for an export (any status), for rivet state chunks.
pub fn list_chunk_tasks_for_run( &self, run_id: &str, ) -> Result<Vec<ChunkTaskInfo>>
Source§impl StateStore
Incremental cursor store — reads and writes export_state.
impl StateStore
Incremental cursor store — reads and writes export_state.
The cursor records the last extracted value so incremental runs can pick up where the previous run left off. Invariant I3 (Write Before Cursor) governs the ordering of cursor updates relative to destination writes.
Source§impl StateStore
File log store — reads and writes file_log.
impl StateStore
File log store — reads and writes file_log.
Historical note: this table was named file_manifest prior to schema v8.
The name was reclaimed for the 0.7.0 cloud-output JSON manifest contract;
the internal SQLite log was renamed to file_log to remove the overload.
Invariant I2 (Write Before Log) governs when record_file is called:
only after a destination write succeeds. Failed writes produce no log entry.
Invariant I7 (File-Log Failure Is Non-Fatal) means callers use let _ = record_file(...).
Source§impl StateStore
impl StateStore
Sourcepub fn store_journal(&self, journal: &RunJournal) -> Result<()>
pub fn store_journal(&self, journal: &RunJournal) -> Result<()>
Persist a completed RunJournal to the state DB.
Called once per export run, after RunCompleted has been recorded.
Overwrites any existing row for the same run_id (idempotent on retry).
Sourcepub fn load_journal(&self, run_id: &str) -> Result<Option<RunJournal>>
pub fn load_journal(&self, run_id: &str) -> Result<Option<RunJournal>>
Load a journal by run_id. Returns None if the run is not found.
Sourcepub fn recent_journals(
&self,
export_name: &str,
limit: usize,
) -> Result<Vec<RunJournal>>
pub fn recent_journals( &self, export_name: &str, limit: usize, ) -> Result<Vec<RunJournal>>
Return the most recent limit journal entries for an export, newest first.
Source§impl StateStore
Metrics store — reads and writes export_metrics.
impl StateStore
Metrics store — reads and writes export_metrics.
Invariant I4 (Metric After Verdict) governs when record_metric is called:
only after the terminal run outcome is determined.
pub fn record_metric( &self, export_name: &str, run_id: &str, duration_ms: i64, total_rows: i64, peak_rss_mb: Option<i64>, status: &str, error_message: Option<&str>, tuning_profile: Option<&str>, format: Option<&str>, mode: Option<&str>, files_produced: i64, bytes_written: i64, retries: i64, validated: Option<bool>, schema_changed: Option<bool>, ) -> Result<()>
pub fn get_metrics( &self, export_name: Option<&str>, limit: usize, ) -> Result<Vec<ExportMetric>>
Source§impl StateStore
impl StateStore
Sourcepub fn record_committed_incremental(
&self,
export_name: &str,
cursor: &str,
run_id: &str,
) -> Result<()>
pub fn record_committed_incremental( &self, export_name: &str, cursor: &str, run_id: &str, ) -> Result<()>
Record a successful incremental commit: cursor is the max value written
to destination in this run.
Sourcepub fn record_committed_chunked(
&self,
export_name: &str,
highest_chunk_index: i64,
run_id: &str,
) -> Result<()>
pub fn record_committed_chunked( &self, export_name: &str, highest_chunk_index: i64, run_id: &str, ) -> Result<()>
Record a successful chunked-run commit: the highest completed chunk_index for this run.
Sourcepub fn record_verified_chunked(
&self,
export_name: &str,
highest_chunk_index: i64,
run_id: &str,
) -> Result<()>
pub fn record_verified_chunked( &self, export_name: &str, highest_chunk_index: i64, run_id: &str, ) -> Result<()>
Record a successful reconcile: all partitions in run_id matched.
pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression>
pub fn list_progression(&self) -> Result<Vec<ExportProgression>>
Source§impl StateStore
impl StateStore
Sourcepub fn record_run_aggregate(&self, agg: &RunAggregate) -> Result<()>
pub fn record_run_aggregate(&self, agg: &RunAggregate) -> Result<()>
Persist an aggregate. per_export is serialized as a JSON array into
details_json.
Sourcepub fn get_recent_run_aggregates(
&self,
limit: usize,
) -> Result<Vec<RunAggregate>>
pub fn get_recent_run_aggregates( &self, limit: usize, ) -> Result<Vec<RunAggregate>>
Most-recent aggregates first.
Source§impl StateStore
Schema history store — reads and writes export_schema.
impl StateStore
Schema history store — reads and writes export_schema.
Captures a schema snapshot per export on each run and surfaces structural drift (added/removed/retyped columns) by diffing against the stored snapshot.
pub fn get_stored_schema( &self, export_name: &str, ) -> Result<Option<Vec<SchemaColumn>>>
pub fn store_schema( &self, export_name: &str, columns: &[SchemaColumn], ) -> Result<()>
Sourcepub fn detect_schema_change(
&self,
export_name: &str,
current: &[SchemaColumn],
) -> Result<Option<SchemaChange>>
pub fn detect_schema_change( &self, export_name: &str, current: &[SchemaColumn], ) -> Result<Option<SchemaChange>>
Detect structural drift versus the stored snapshot.
On the first run (no stored snapshot) the current schema is stored and
Ok(None) is returned. On subsequent runs a diff is computed and returned
as Ok(Some(change)) when columns differ — but the stored snapshot is
not updated automatically. Callers must call [store_schema] explicitly
after deciding whether to accept the change (policy warn/continue) or
reject it (policy fail, which intentionally leaves the old snapshot so the
next run detects the same change again).
Source§impl StateStore
impl StateStore
Sourcepub fn get_shape_stats(&self, export_name: &str) -> Result<HashMap<String, u64>>
pub fn get_shape_stats(&self, export_name: &str) -> Result<HashMap<String, u64>>
Return the stored per-column max byte lengths for export_name.
Sourcepub fn store_shape_stats(
&self,
export_name: &str,
stats: &HashMap<String, u64>,
) -> Result<()>
pub fn store_shape_stats( &self, export_name: &str, stats: &HashMap<String, u64>, ) -> Result<()>
Upsert per-column max byte lengths, keeping the running maximum.
Sourcepub fn detect_shape_drift(
&self,
export_name: &str,
current: &HashMap<String, u64>,
warn_factor: f64,
) -> Result<Vec<ShapeWarning>>
pub fn detect_shape_drift( &self, export_name: &str, current: &HashMap<String, u64>, warn_factor: f64, ) -> Result<Vec<ShapeWarning>>
Compare current run’s per-column maxima against stored history.
Returns a warning for every column whose current_max > stored_max * warn_factor.
The stored maxima are updated to max(stored, current) unconditionally so that
the running high-water mark is always current.
First-run columns (no stored record) are silently accepted.
Source§impl StateStore
impl StateStore
Sourcepub fn open(config_path: &str) -> Result<Self>
pub fn open(config_path: &str) -> Result<Self>
Open the appropriate backend.
Checks RIVET_STATE_URL; falls back to SQLite next to config_path.
Sourcepub fn state_db_path(config_path: &str) -> PathBuf
pub fn state_db_path(config_path: &str) -> PathBuf
Path to .rivet_state.db for SQLite deployments. Returns the config
directory path for Postgres (not meaningful for connection, only used
by legacy callers — prefer state_ref() for new code).
Sourcepub fn state_ref(&self) -> &StateRef
pub fn state_ref(&self) -> &StateRef
Serialisable connection reference for parallel chunk workers.
Sourcepub fn open_in_memory() -> Result<Self>
pub fn open_in_memory() -> Result<Self>
In-memory SQLite store for unit tests.
Sourcepub fn open_at_path(db_path: &Path) -> Result<Self>
pub fn open_at_path(db_path: &Path) -> Result<Self>
Open a SQLite store at an explicit file path (tests that need
cross-connection access via claim_next_chunk_task_at_path).