Skip to main content

WorkflowStore

Trait WorkflowStore 

Source
pub trait WorkflowStore:
    Send
    + Sync
    + 'static {
Show 54 methods // Required methods fn create_namespace( &self, name: &str, ) -> impl Future<Output = Result<()>> + Send; fn list_namespaces( &self, ) -> impl Future<Output = Result<Vec<NamespaceRecord>>> + Send; fn delete_namespace( &self, name: &str, ) -> impl Future<Output = Result<bool>> + Send; fn get_namespace_stats( &self, namespace: &str, ) -> impl Future<Output = Result<NamespaceStats>> + Send; fn create_workflow( &self, workflow: &WorkflowRecord, ) -> impl Future<Output = Result<()>> + Send; fn get_workflow( &self, id: &str, ) -> impl Future<Output = Result<Option<WorkflowRecord>>> + Send; fn list_workflows( &self, namespace: &str, status: Option<WorkflowStatus>, workflow_type: Option<&str>, search_attrs_filter: Option<&str>, limit: i64, offset: i64, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send; fn list_archivable_workflows( &self, cutoff: f64, limit: i64, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send; fn mark_archived_and_purge( &self, workflow_id: &str, archive_uri: &str, archived_at: f64, ) -> impl Future<Output = Result<()>> + Send; fn upsert_search_attributes( &self, workflow_id: &str, patch_json: &str, ) -> impl Future<Output = Result<()>> + Send; fn update_workflow_status( &self, id: &str, status: WorkflowStatus, result: Option<&str>, error: Option<&str>, ) -> impl Future<Output = Result<()>> + Send; fn claim_workflow( &self, id: &str, worker_id: &str, ) -> impl Future<Output = Result<bool>> + Send; fn mark_workflow_dispatchable( &self, workflow_id: &str, ) -> impl Future<Output = Result<()>> + Send; fn claim_workflow_task( &self, task_queue: &str, worker_id: &str, ) -> impl Future<Output = Result<Option<WorkflowRecord>>> + Send; fn release_workflow_task( &self, workflow_id: &str, worker_id: &str, ) -> impl Future<Output = Result<()>> + Send; fn release_stale_dispatch_leases( &self, now: f64, timeout_secs: f64, ) -> impl Future<Output = Result<u64>> + Send; fn append_event( &self, event: &WorkflowEvent, ) -> impl Future<Output = Result<i64>> + Send; fn list_events( &self, workflow_id: &str, ) -> impl Future<Output = Result<Vec<WorkflowEvent>>> + Send; fn get_event_count( &self, workflow_id: &str, ) -> impl Future<Output = Result<i64>> + Send; fn create_activity( &self, activity: &WorkflowActivity, ) -> impl Future<Output = Result<i64>> + Send; fn get_activity( &self, id: i64, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send; fn get_activity_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send; fn claim_activity( &self, task_queue: &str, worker_id: &str, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send; fn requeue_activity_for_retry( &self, id: i64, next_attempt: i32, next_scheduled_at: f64, ) -> impl Future<Output = Result<()>> + Send; fn complete_activity( &self, id: i64, result: Option<&str>, error: Option<&str>, failed: bool, ) -> impl Future<Output = Result<()>> + Send; fn heartbeat_activity( &self, id: i64, details: Option<&str>, ) -> impl Future<Output = Result<()>> + Send; fn get_timed_out_activities( &self, now: f64, ) -> impl Future<Output = Result<Vec<WorkflowActivity>>> + Send; fn cancel_pending_activities( &self, workflow_id: &str, ) -> impl Future<Output = Result<u64>> + Send; fn cancel_pending_timers( &self, workflow_id: &str, ) -> impl Future<Output = Result<u64>> + Send; fn create_timer( &self, timer: &WorkflowTimer, ) -> impl Future<Output = Result<i64>> + Send; fn get_timer_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> impl Future<Output = Result<Option<WorkflowTimer>>> + Send; fn fire_due_timers( &self, now: f64, ) -> impl Future<Output = Result<Vec<WorkflowTimer>>> + Send; fn send_signal( &self, signal: &WorkflowSignal, ) -> impl Future<Output = Result<i64>> + Send; fn consume_signals( &self, workflow_id: &str, name: &str, ) -> impl Future<Output = Result<Vec<WorkflowSignal>>> + Send; fn create_schedule( &self, schedule: &WorkflowSchedule, ) -> impl Future<Output = Result<()>> + Send; fn get_schedule( &self, namespace: &str, name: &str, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send; fn list_schedules( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<WorkflowSchedule>>> + Send; fn update_schedule_last_run( &self, namespace: &str, name: &str, last_run_at: f64, next_run_at: f64, workflow_id: &str, ) -> impl Future<Output = Result<()>> + Send; fn delete_schedule( &self, namespace: &str, name: &str, ) -> impl Future<Output = Result<bool>> + Send; fn update_schedule( &self, namespace: &str, name: &str, patch: &SchedulePatch, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send; fn set_schedule_paused( &self, namespace: &str, name: &str, paused: bool, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send; fn register_worker( &self, worker: &WorkflowWorker, ) -> impl Future<Output = Result<()>> + Send; fn heartbeat_worker( &self, id: &str, now: f64, ) -> impl Future<Output = Result<()>> + Send; fn list_workers( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<WorkflowWorker>>> + Send; fn remove_dead_workers( &self, cutoff: f64, ) -> impl Future<Output = Result<Vec<String>>> + Send; fn create_api_key( &self, key_hash: &str, prefix: &str, label: Option<&str>, created_at: f64, ) -> impl Future<Output = Result<()>> + Send; fn validate_api_key( &self, key_hash: &str, ) -> impl Future<Output = Result<bool>> + Send; fn list_api_keys( &self, ) -> impl Future<Output = Result<Vec<ApiKeyRecord>>> + Send; fn revoke_api_key( &self, prefix: &str, ) -> impl Future<Output = Result<bool>> + Send; fn list_child_workflows( &self, parent_id: &str, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send; fn create_snapshot( &self, workflow_id: &str, event_seq: i32, state_json: &str, ) -> impl Future<Output = Result<()>> + Send; fn get_latest_snapshot( &self, workflow_id: &str, ) -> impl Future<Output = Result<Option<WorkflowSnapshot>>> + Send; fn get_queue_stats( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<QueueStats>>> + Send; fn try_acquire_scheduler_lock( &self, ) -> impl Future<Output = Result<bool>> + Send;
}
Expand description

Core storage trait for the workflow engine.

All database access goes through this trait. Methods that operate on namespace-scoped data take a namespace parameter. The “main” namespace is always available.

All methods return Send futures so they can be used from tokio::spawn.

Required Methods§

Source

fn create_namespace( &self, name: &str, ) -> impl Future<Output = Result<()>> + Send

Source

fn list_namespaces( &self, ) -> impl Future<Output = Result<Vec<NamespaceRecord>>> + Send

Source

fn delete_namespace( &self, name: &str, ) -> impl Future<Output = Result<bool>> + Send

Source

fn get_namespace_stats( &self, namespace: &str, ) -> impl Future<Output = Result<NamespaceStats>> + Send

Source

fn create_workflow( &self, workflow: &WorkflowRecord, ) -> impl Future<Output = Result<()>> + Send

Source

fn get_workflow( &self, id: &str, ) -> impl Future<Output = Result<Option<WorkflowRecord>>> + Send

Source

fn list_workflows( &self, namespace: &str, status: Option<WorkflowStatus>, workflow_type: Option<&str>, search_attrs_filter: Option<&str>, limit: i64, offset: i64, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send

Source

fn list_archivable_workflows( &self, cutoff: f64, limit: i64, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send

List workflows in terminal states whose completed_at is older than cutoff and which haven’t been archived yet. Used by the optional S3 archival background task to batch candidates.

Source

fn mark_archived_and_purge( &self, workflow_id: &str, archive_uri: &str, archived_at: f64, ) -> impl Future<Output = Result<()>> + Send

Mark a workflow as archived (records archived_at + archive_uri) and purge its events, activities, timers, signals, and snapshots. The workflow record itself is preserved so GET /workflows/{id} still resolves with an archive pointer.

Source

fn upsert_search_attributes( &self, workflow_id: &str, patch_json: &str, ) -> impl Future<Output = Result<()>> + Send

Merge a JSON object patch into the workflow’s search_attributes. Keys in the patch overwrite existing keys; keys already present but not in the patch are preserved. If the current column is NULL, the patch becomes the new value.

Source

fn update_workflow_status( &self, id: &str, status: WorkflowStatus, result: Option<&str>, error: Option<&str>, ) -> impl Future<Output = Result<()>> + Send

Source

fn claim_workflow( &self, id: &str, worker_id: &str, ) -> impl Future<Output = Result<bool>> + Send

Source

fn mark_workflow_dispatchable( &self, workflow_id: &str, ) -> impl Future<Output = Result<()>> + Send

Mark a workflow as having new events that need a worker to replay it. Idempotent — calling repeatedly is fine. Cleared by claim_workflow_task.

Source

fn claim_workflow_task( &self, task_queue: &str, worker_id: &str, ) -> impl Future<Output = Result<Option<WorkflowRecord>>> + Send

Atomically claim the oldest dispatchable workflow on a queue. Sets dispatch_claimed_by and dispatch_last_heartbeat, clears needs_dispatch. Returns the workflow record or None if nothing is available.

Source

fn release_workflow_task( &self, workflow_id: &str, worker_id: &str, ) -> impl Future<Output = Result<()>> + Send

Release a workflow task’s dispatch lease (called when the worker submits its commands batch). Only succeeds if dispatch_claimed_by matches the calling worker.

Source

fn release_stale_dispatch_leases( &self, now: f64, timeout_secs: f64, ) -> impl Future<Output = Result<u64>> + Send

Forcibly release dispatch leases whose worker hasn’t heartbeat’d within timeout_secs. Used by the engine’s background poller to recover from worker crashes. Returns how many leases were released (each becomes claimable again, with needs_dispatch=true).

Source

fn append_event( &self, event: &WorkflowEvent, ) -> impl Future<Output = Result<i64>> + Send

Source

fn list_events( &self, workflow_id: &str, ) -> impl Future<Output = Result<Vec<WorkflowEvent>>> + Send

Source

fn get_event_count( &self, workflow_id: &str, ) -> impl Future<Output = Result<i64>> + Send

Source

fn create_activity( &self, activity: &WorkflowActivity, ) -> impl Future<Output = Result<i64>> + Send

Source

fn get_activity( &self, id: i64, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send

Look up an activity by its primary key.

Source

fn get_activity_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send

Look up an activity by its workflow-relative sequence number. Used for idempotent scheduling: the engine checks if (workflow_id, seq) already exists before creating a new row.

Source

fn claim_activity( &self, task_queue: &str, worker_id: &str, ) -> impl Future<Output = Result<Option<WorkflowActivity>>> + Send

Source

fn requeue_activity_for_retry( &self, id: i64, next_attempt: i32, next_scheduled_at: f64, ) -> impl Future<Output = Result<()>> + Send

Re-queue an activity for retry: clears the running state (status→PENDING, claimed_by/started_at cleared), bumps attempt, and sets scheduled_at = now + backoff so the next claim_activity won’t pick it up before the backoff elapses.

Source

fn complete_activity( &self, id: i64, result: Option<&str>, error: Option<&str>, failed: bool, ) -> impl Future<Output = Result<()>> + Send

Source

fn heartbeat_activity( &self, id: i64, details: Option<&str>, ) -> impl Future<Output = Result<()>> + Send

Source

fn get_timed_out_activities( &self, now: f64, ) -> impl Future<Output = Result<Vec<WorkflowActivity>>> + Send

Source

fn cancel_pending_activities( &self, workflow_id: &str, ) -> impl Future<Output = Result<u64>> + Send

Mark all PENDING activities of a workflow as CANCELLED so workers that haven’t claimed them yet won’t pick them up. Returns the number of rows affected. Does NOT touch RUNNING activities — those will see the cancellation when they next heartbeat or complete.

Source

fn cancel_pending_timers( &self, workflow_id: &str, ) -> impl Future<Output = Result<u64>> + Send

Mark all unfired timers of a workflow as fired without firing (effectively removing them from the timer poller). Returns the number of rows affected.

Source

fn create_timer( &self, timer: &WorkflowTimer, ) -> impl Future<Output = Result<i64>> + Send

Source

fn get_timer_by_workflow_seq( &self, workflow_id: &str, seq: i32, ) -> impl Future<Output = Result<Option<WorkflowTimer>>> + Send

Look up an existing timer by its workflow-relative seq. Used by the engine for idempotent ScheduleTimer (deterministic replay can call schedule_timer for the same seq more than once on retries).

Source

fn fire_due_timers( &self, now: f64, ) -> impl Future<Output = Result<Vec<WorkflowTimer>>> + Send

Source

fn send_signal( &self, signal: &WorkflowSignal, ) -> impl Future<Output = Result<i64>> + Send

Source

fn consume_signals( &self, workflow_id: &str, name: &str, ) -> impl Future<Output = Result<Vec<WorkflowSignal>>> + Send

Source

fn create_schedule( &self, schedule: &WorkflowSchedule, ) -> impl Future<Output = Result<()>> + Send

Source

fn get_schedule( &self, namespace: &str, name: &str, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send

Source

fn list_schedules( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<WorkflowSchedule>>> + Send

Source

fn update_schedule_last_run( &self, namespace: &str, name: &str, last_run_at: f64, next_run_at: f64, workflow_id: &str, ) -> impl Future<Output = Result<()>> + Send

Source

fn delete_schedule( &self, namespace: &str, name: &str, ) -> impl Future<Output = Result<bool>> + Send

Source

fn update_schedule( &self, namespace: &str, name: &str, patch: &SchedulePatch, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send

Apply an in-place patch to a schedule. Only fields present on patch are updated; the rest keep their current values. Returns the updated record, or None if the schedule doesn’t exist.

The scheduler’s next_run_at is recomputed from the new cron_expr + timezone on the next evaluation tick, so a PATCH takes effect within the scheduler’s poll interval.

Source

fn set_schedule_paused( &self, namespace: &str, name: &str, paused: bool, ) -> impl Future<Output = Result<Option<WorkflowSchedule>>> + Send

Flip a schedule’s paused flag. Returns the updated record, or None if the schedule doesn’t exist.

A paused schedule is skipped by the scheduler; resuming it doesn’t backfill missed fires — the next fire is whatever the cron expression says, starting from now.

Source

fn register_worker( &self, worker: &WorkflowWorker, ) -> impl Future<Output = Result<()>> + Send

Source

fn heartbeat_worker( &self, id: &str, now: f64, ) -> impl Future<Output = Result<()>> + Send

Source

fn list_workers( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<WorkflowWorker>>> + Send

Source

fn remove_dead_workers( &self, cutoff: f64, ) -> impl Future<Output = Result<Vec<String>>> + Send

Source

fn create_api_key( &self, key_hash: &str, prefix: &str, label: Option<&str>, created_at: f64, ) -> impl Future<Output = Result<()>> + Send

Source

fn validate_api_key( &self, key_hash: &str, ) -> impl Future<Output = Result<bool>> + Send

Source

fn list_api_keys( &self, ) -> impl Future<Output = Result<Vec<ApiKeyRecord>>> + Send

Source

fn revoke_api_key( &self, prefix: &str, ) -> impl Future<Output = Result<bool>> + Send

Source

fn list_child_workflows( &self, parent_id: &str, ) -> impl Future<Output = Result<Vec<WorkflowRecord>>> + Send

Source

fn create_snapshot( &self, workflow_id: &str, event_seq: i32, state_json: &str, ) -> impl Future<Output = Result<()>> + Send

Source

fn get_latest_snapshot( &self, workflow_id: &str, ) -> impl Future<Output = Result<Option<WorkflowSnapshot>>> + Send

Source

fn get_queue_stats( &self, namespace: &str, ) -> impl Future<Output = Result<Vec<QueueStats>>> + Send

Source

fn try_acquire_scheduler_lock( &self, ) -> impl Future<Output = Result<bool>> + Send

Try to acquire the scheduler lock for leader election. Returns true if this instance should run the cron scheduler.

  • SQLite: always returns true (single-instance assumed)
  • Postgres: uses pg_try_advisory_lock (only one instance wins)

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§