Skip to main content

PostgresStore

Struct PostgresStore 

Source
pub struct PostgresStore { /* private fields */ }

Implementations§

Source§

impl PostgresStore

Source

pub async fn new(url: &str) -> Result<Self>

Source

pub async fn try_acquire_leader_lock(&self) -> Result<bool>

Try to acquire pg_advisory_lock for leader election. Returns true if this instance is the leader (scheduler should run).

Trait Implementations§

Source§

impl WorkflowStore for PostgresStore

Source§

async fn create_namespace(&self, name: &str) -> Result<()>

Source§

async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>>

Source§

async fn delete_namespace(&self, name: &str) -> Result<bool>

Source§

async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats>

Source§

async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()>

Source§

async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>>

Source§

async fn list_workflows( &self, namespace: &str, status: Option<WorkflowStatus>, workflow_type: Option<&str>, search_attrs_filter: Option<&str>, limit: i64, offset: i64, ) -> Result<Vec<WorkflowRecord>>

Source§

async fn update_workflow_status( &self, id: &str, status: WorkflowStatus, result: Option<&str>, error: Option<&str>, ) -> Result<()>

Source§

async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool>

Source§

async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()>

Mark a workflow as having new events that need a worker to replay it. Idempotent — calling repeatedly is fine. Cleared by claim_workflow_task.
Source§

async fn claim_workflow_task( &self, task_queue: &str, worker_id: &str, ) -> Result<Option<WorkflowRecord>>

Atomically claim the oldest dispatchable workflow on a queue. Sets dispatch_claimed_by and dispatch_last_heartbeat, clears needs_dispatch. Returns the workflow record or None if nothing is available.
Source§

async fn release_workflow_task( &self, workflow_id: &str, worker_id: &str, ) -> Result<()>

Release a workflow task’s dispatch lease (called when the worker submits its commands batch). Only succeeds if dispatch_claimed_by matches the calling worker.
Source§

async fn release_stale_dispatch_leases( &self, now: f64, timeout_secs: f64, ) -> Result<u64>

Forcibly release dispatch leases whose worker hasn’t heartbeat’d within timeout_secs. Used by the engine’s background poller to recover from worker crashes. Returns how many leases were released (each becomes claimable again, with needs_dispatch=true).
Source§

async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64>

Source§

async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>>

Source§

async fn get_event_count(&self, workflow_id: &str) -> Result<i64>

Source§

async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64>

Source§

async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>>

Look up an activity by its primary key.
Source§

async fn get_activity_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> Result<Option<WorkflowActivity>>

Look up an activity by its workflow-relative sequence number. Used for idempotent scheduling: the engine checks if (workflow_id, seq) already exists before creating a new row.
Source§

async fn claim_activity( &self, task_queue: &str, worker_id: &str, ) -> Result<Option<WorkflowActivity>>

Source§

async fn requeue_activity_for_retry( &self, id: i64, next_attempt: i32, next_scheduled_at: f64, ) -> Result<()>

Re-queue an activity for retry: clears the running state (status→PENDING, claimed_by/started_at cleared), bumps attempt, and sets scheduled_at = now + backoff so the next claim_activity won’t pick it up before the backoff elapses.
Source§

async fn complete_activity( &self, id: i64, result: Option<&str>, error: Option<&str>, failed: bool, ) -> Result<()>

Source§

async fn heartbeat_activity( &self, id: i64, _details: Option<&str>, ) -> Result<()>

Source§

async fn get_timed_out_activities( &self, now: f64, ) -> Result<Vec<WorkflowActivity>>

Source§

async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64>

Source§

async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64>

Mark all PENDING activities of a workflow as CANCELLED so workers that haven’t claimed them yet won’t pick them up. Returns the number of rows affected. Does NOT touch RUNNING activities — those will see the cancellation when they next heartbeat or complete.
Source§

async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64>

Mark all unfired timers of a workflow as fired without firing (effectively removing them from the timer poller). Returns the number of rows affected.
Source§

async fn get_timer_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> Result<Option<WorkflowTimer>>

Look up an existing timer by its workflow-relative seq. Used by the engine for idempotent ScheduleTimer (deterministic replay can call schedule_timer for the same seq more than once on retries).
Source§

async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>>

Source§

async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64>

Source§

async fn consume_signals( &self, workflow_id: &str, name: &str, ) -> Result<Vec<WorkflowSignal>>

Source§

async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()>

Source§

async fn get_schedule( &self, namespace: &str, name: &str, ) -> Result<Option<WorkflowSchedule>>

Source§

async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>>

Source§

async fn update_schedule_last_run( &self, namespace: &str, name: &str, last_run_at: f64, next_run_at: f64, workflow_id: &str, ) -> Result<()>

Source§

async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool>

Source§

async fn list_archivable_workflows( &self, cutoff: f64, limit: i64, ) -> Result<Vec<WorkflowRecord>>

List workflows in terminal states whose completed_at is older than cutoff and which haven’t been archived yet. Used by the optional S3 archival background task to batch candidates.
Source§

async fn mark_archived_and_purge( &self, workflow_id: &str, archive_uri: &str, archived_at: f64, ) -> Result<()>

Mark a workflow as archived (records archived_at + archive_uri) and purge its events, activities, timers, signals, and snapshots. The workflow record itself is preserved so GET /workflows/{id} still resolves with an archive pointer.
Source§

async fn upsert_search_attributes( &self, workflow_id: &str, patch_json: &str, ) -> Result<()>

Merge a JSON object patch into the workflow’s search_attributes. Keys in the patch overwrite existing keys; keys already present but not in the patch are preserved. If the current column is NULL, the patch becomes the new value.
Source§

async fn update_schedule( &self, namespace: &str, name: &str, patch: &SchedulePatch, ) -> Result<Option<WorkflowSchedule>>

Apply an in-place patch to a schedule. Only fields present on patch are updated; the rest keep their current values. Returns the updated record, or None if the schedule doesn’t exist. Read more
Source§

async fn set_schedule_paused( &self, namespace: &str, name: &str, paused: bool, ) -> Result<Option<WorkflowSchedule>>

Flip a schedule’s paused flag. Returns the updated record, or None if the schedule doesn’t exist. Read more
Source§

async fn register_worker(&self, w: &WorkflowWorker) -> Result<()>

Source§

async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()>

Source§

async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>>

Source§

async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>>

Source§

async fn create_api_key( &self, key_hash: &str, prefix: &str, label: Option<&str>, created_at: f64, ) -> Result<()>

Source§

async fn validate_api_key(&self, key_hash: &str) -> Result<bool>

Source§

async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>>

Source§

async fn revoke_api_key(&self, prefix: &str) -> Result<bool>

Source§

async fn list_child_workflows( &self, parent_id: &str, ) -> Result<Vec<WorkflowRecord>>

Source§

async fn create_snapshot( &self, workflow_id: &str, event_seq: i32, state_json: &str, ) -> Result<()>

Source§

async fn get_latest_snapshot( &self, workflow_id: &str, ) -> Result<Option<WorkflowSnapshot>>

Source§

async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>>

Source§

async fn try_acquire_scheduler_lock(&self) -> Result<bool>

Try to acquire the scheduler lock for leader election. Returns true if this instance should run the cron scheduler. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,