Skip to main content

StateStore

Struct StateStore 

Source
pub struct StateStore { /* private fields */ }
Expand description

Entry point for all persistent state. Supports two backends:

  • SQLite (default) — a single .rivet_state.db file next to the config. Good for local / single-node / dev deployments.
  • PostgreSQL — a shared database addressed by RIVET_STATE_URL. Required for stateless container / Kubernetes deployments where the rivet pod is ephemeral or replicated.

Set the RIVET_STATE_URL environment variable to a PostgreSQL URL to activate the Postgres backend:

RIVET_STATE_URL=postgresql://user:pass@host:5432/rivet_state

When the variable is absent or does not start with postgres, SQLite is used and the variable is ignored.

Implementations§

Source§

impl StateStore

Chunk checkpoint store — reads and writes chunk_run and chunk_task.

Governs Invariant I5 (Chunk Task Acyclicity): transitions are strictly forward (pending → running → completed | failed). Completed tasks are never re-claimed. Failed tasks return to running only while attempts < max_chunk_attempts.

Some methods accept an explicit state_ref and open a fresh connection. This is required for parallel workers that cannot share one Connection.

Source

pub fn list_export_names_with_in_progress_chunk_runs( &self, ) -> Result<Vec<String>>

Distinct export_name values that currently have at least one chunk_run row with status = 'in_progress' (interrupted run — resume or reset).

Source

pub fn find_in_progress_chunk_run( &self, export_name: &str, ) -> Result<Option<(String, String)>>

Latest in_progress chunk run for this export, if any.

Source

pub fn create_chunk_run( &self, run_id: &str, export_name: &str, plan_hash: &str, max_chunk_attempts: u32, ) -> Result<()>

Source

pub fn insert_chunk_tasks( &self, run_id: &str, ranges: &[(i64, i64)], ) -> Result<()>

Source

pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize>

Mark tasks left running after a crash as pending so they can be retried.

Source

pub fn claim_next_chunk_task( &self, run_id: &str, ) -> Result<Option<(i64, String, String)>>

Atomically claim the next pending or retryable failed chunk.

Source

pub fn claim_next_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, ) -> Result<Option<(i64, String, String)>>

Claim next chunk using a fresh connection identified by state_ref. Used by parallel workers that cannot share a single connection.

Source

pub fn complete_chunk_task( &self, run_id: &str, chunk_index: i64, rows_written: i64, file_name: Option<&str>, ) -> Result<()>

Source

pub fn fail_chunk_task( &self, run_id: &str, chunk_index: i64, err: &str, ) -> Result<()>

Source

pub fn fail_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, chunk_index: i64, err: &str, ) -> Result<()>

Source

pub fn complete_chunk_task_at_ref( state_ref: &StateRef, run_id: &str, chunk_index: i64, rows_written: i64, file_name: Option<&str>, ) -> Result<()>

Source

pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize>

Source

pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64>

Source

pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()>

Source

pub fn reset_chunk_task_for_re_export( &self, run_id: &str, chunk_index: i64, reason: &str, ) -> Result<usize>

Reset a single completed chunk task back to pending so the next --resume run re-exports it.

Used by ADR-0012 M8 reconciliation: when the destination’s manifest says a part was committed but the actual object is missing or has drifted (size mismatch), the chunk_task that produced it must run again. Without this reset, claim_next_chunk_task would skip the completed row and the destination would stay broken across resumes.

Distinct from reset_chunk_checkpoint(export_name) (which wipes every run+task for an export — the operator-facing “abandon resume” nuke) and from reset_stale_running_chunk_tasks (which only rescues tasks left in ‘running’ after a crash). This one is surgical: a single (run_id, chunk_index) goes from completed back to pending, attempts reset to 0, file_name cleared, and last_error annotated with the M8 reason so the journal/audit trail records why.

Returns the number of rows updated (0 or 1). Idempotent — calling it on a non-completed task is a no-op.

Source

pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize>

Remove all chunk runs and tasks for an export (abandon resume).

Source

pub fn get_latest_chunk_run( &self, export_name: &str, ) -> Result<Option<(String, String, String, String)>>

Latest chunk_run row for an export (any status), for rivet state chunks.

Source

pub fn list_chunk_tasks_for_run( &self, run_id: &str, ) -> Result<Vec<ChunkTaskInfo>>

Source§

impl StateStore

Incremental cursor store — reads and writes export_state.

The cursor records the last extracted value so incremental runs can pick up where the previous run left off. Invariant I3 (Write Before Cursor) governs the ordering of cursor updates relative to destination writes.

Source

pub fn get(&self, export_name: &str) -> Result<CursorState>

Source

pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()>

Source

pub fn reset(&self, export_name: &str) -> Result<()>

Source

pub fn list_all(&self) -> Result<Vec<CursorState>>

Source§

impl StateStore

File log store — reads and writes file_log.

Historical note: this table was named file_manifest prior to schema v8. The name was reclaimed for the 0.7.0 cloud-output JSON manifest contract; the internal SQLite log was renamed to file_log to remove the overload.

Invariant I2 (Write Before Log) governs when record_file is called: only after a destination write succeeds. Failed writes produce no log entry. Invariant I7 (File-Log Failure Is Non-Fatal) means callers use let _ = record_file(...).

Source

pub fn record_file( &self, run_id: &str, export_name: &str, file_name: &str, row_count: i64, bytes: i64, format: &str, compression: Option<&str>, ) -> Result<()>

Source

pub fn get_files( &self, export_name: Option<&str>, limit: usize, ) -> Result<Vec<FileRecord>>

Source§

impl StateStore

Source

pub fn store_journal(&self, journal: &RunJournal) -> Result<()>

Persist a completed RunJournal to the state DB.

Called once per export run, after RunCompleted has been recorded. Overwrites any existing row for the same run_id (idempotent on retry).

Source

pub fn load_journal(&self, run_id: &str) -> Result<Option<RunJournal>>

Load a journal by run_id. Returns None if the run is not found.

Source

pub fn recent_journals( &self, export_name: &str, limit: usize, ) -> Result<Vec<RunJournal>>

Return the most recent limit journal entries for an export, newest first.

Source§

impl StateStore

Metrics store — reads and writes export_metrics.

Invariant I4 (Metric After Verdict) governs when record_metric is called: only after the terminal run outcome is determined.

Source

pub fn record_metric( &self, export_name: &str, run_id: &str, duration_ms: i64, total_rows: i64, peak_rss_mb: Option<i64>, status: &str, error_message: Option<&str>, tuning_profile: Option<&str>, format: Option<&str>, mode: Option<&str>, files_produced: i64, bytes_written: i64, retries: i64, validated: Option<bool>, schema_changed: Option<bool>, ) -> Result<()>

Source

pub fn get_metrics( &self, export_name: Option<&str>, limit: usize, ) -> Result<Vec<ExportMetric>>

Source§

impl StateStore

Source

pub fn record_committed_incremental( &self, export_name: &str, cursor: &str, run_id: &str, ) -> Result<()>

Record a successful incremental commit: cursor is the max value written to destination in this run.

Source

pub fn record_committed_chunked( &self, export_name: &str, highest_chunk_index: i64, run_id: &str, ) -> Result<()>

Record a successful chunked-run commit: the highest completed chunk_index for this run.

Source

pub fn record_verified_chunked( &self, export_name: &str, highest_chunk_index: i64, run_id: &str, ) -> Result<()>

Record a successful reconcile: all partitions in run_id matched.

Source

pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression>

Source

pub fn list_progression(&self) -> Result<Vec<ExportProgression>>

Source§

impl StateStore

Source

pub fn record_run_aggregate(&self, agg: &RunAggregate) -> Result<()>

Persist an aggregate. per_export is serialized as a JSON array into details_json.

Source

pub fn get_recent_run_aggregates( &self, limit: usize, ) -> Result<Vec<RunAggregate>>

Most-recent aggregates first.

Source§

impl StateStore

Schema history store — reads and writes export_schema.

Captures a schema snapshot per export on each run and surfaces structural drift (added/removed/retyped columns) by diffing against the stored snapshot.

Source

pub fn get_stored_schema( &self, export_name: &str, ) -> Result<Option<Vec<SchemaColumn>>>

Source

pub fn store_schema( &self, export_name: &str, columns: &[SchemaColumn], ) -> Result<()>

Source

pub fn detect_schema_change( &self, export_name: &str, current: &[SchemaColumn], ) -> Result<Option<SchemaChange>>

Detect structural drift versus the stored snapshot.

On the first run (no stored snapshot) the current schema is stored and Ok(None) is returned. On subsequent runs a diff is computed and returned as Ok(Some(change)) when columns differ — but the stored snapshot is not updated automatically. Callers must call [store_schema] explicitly after deciding whether to accept the change (policy warn/continue) or reject it (policy fail, which intentionally leaves the old snapshot so the next run detects the same change again).

Source§

impl StateStore

Source

pub fn get_shape_stats(&self, export_name: &str) -> Result<HashMap<String, u64>>

Return the stored per-column max byte lengths for export_name.

Source

pub fn store_shape_stats( &self, export_name: &str, stats: &HashMap<String, u64>, ) -> Result<()>

Upsert per-column max byte lengths, keeping the running maximum.

Source

pub fn detect_shape_drift( &self, export_name: &str, current: &HashMap<String, u64>, warn_factor: f64, ) -> Result<Vec<ShapeWarning>>

Compare current run’s per-column maxima against stored history.

Returns a warning for every column whose current_max > stored_max * warn_factor. The stored maxima are updated to max(stored, current) unconditionally so that the running high-water mark is always current.

First-run columns (no stored record) are silently accepted.

Source§

impl StateStore

Source

pub fn open(config_path: &str) -> Result<Self>

Open the appropriate backend.

Checks RIVET_STATE_URL; falls back to SQLite next to config_path.

Source

pub fn state_db_path(config_path: &str) -> PathBuf

Path to .rivet_state.db for SQLite deployments. Returns the config directory path for Postgres (not meaningful for connection, only used by legacy callers — prefer state_ref() for new code).

Source

pub fn state_ref(&self) -> &StateRef

Serialisable connection reference for parallel chunk workers.

Source

pub fn open_in_memory() -> Result<Self>

In-memory SQLite store for unit tests.

Source

pub fn open_at_path(db_path: &Path) -> Result<Self>

Open a SQLite store at an explicit file path (tests that need cross-connection access via claim_next_chunk_task_at_path).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> MaybeSend for T
where T: Send,

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more