Skip to main content

Engine

Struct Engine 

Source
pub struct Engine<S: WorkflowStore> { /* private fields */ }
Expand description

The workflow engine. Owns the store and manages background tasks (scheduler, timer poller, health monitor).

The API layer holds an Arc<Engine<S>> and delegates all operations here.

Implementations§

Source§

impl<S: WorkflowStore> Engine<S>

Source

pub fn start(store: S) -> Self

Start the engine with all background tasks.

Source

pub fn store(&self) -> &S

Access the underlying store (for the API layer).

Source

pub async fn start_workflow( &self, namespace: &str, workflow_type: &str, workflow_id: &str, input: Option<&str>, task_queue: &str, search_attributes: Option<&str>, ) -> Result<WorkflowRecord>

Source

pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>>

Source

pub async fn list_workflows( &self, namespace: &str, status: Option<WorkflowStatus>, workflow_type: Option<&str>, search_attrs_filter: Option<&str>, limit: i64, offset: i64, ) -> Result<Vec<WorkflowRecord>>

Source

pub async fn upsert_search_attributes( &self, workflow_id: &str, patch_json: &str, ) -> Result<()>

Source

pub async fn cancel_workflow(&self, id: &str) -> Result<bool>

Source

pub async fn terminate_workflow( &self, id: &str, reason: Option<&str>, ) -> Result<bool>

Source

pub async fn send_signal( &self, workflow_id: &str, name: &str, payload: Option<&str>, ) -> Result<()>

Source

pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>>

Source

pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()>

Source

pub async fn heartbeat_worker(&self, id: &str) -> Result<()>

Source

pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>>

Source

pub async fn schedule_activity( &self, workflow_id: &str, seq: i32, name: &str, input: Option<&str>, task_queue: &str, opts: ScheduleActivityOpts, ) -> Result<WorkflowActivity>

Schedule an activity within a workflow.

Idempotent on (workflow_id, seq) — if an activity with this sequence number already exists for the workflow, returns its id without creating a duplicate row or duplicate ActivityScheduled event. This is essential for deterministic replay: a worker can re-run the workflow function and call schedule_activity(seq=1, ...) repeatedly without producing side effects.

On the first call for a seq:

  • inserts a row in workflow_activities with status PENDING
  • appends an ActivityScheduled event to the workflow event log
  • if the workflow is still PENDING, transitions it to RUNNING
Source

pub async fn claim_activity( &self, task_queue: &str, worker_id: &str, ) -> Result<Option<WorkflowActivity>>

Source

pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>>

Source

pub async fn complete_activity( &self, id: i64, result: Option<&str>, error: Option<&str>, failed: bool, ) -> Result<()>

Mark a successfully-executed activity complete and append an ActivityCompleted event to the workflow event log so a replaying workflow can pick up the cached result.

failed=true is preserved for legacy callers that go straight through complete with a non-retry path; new code should call Engine::fail_activity instead so retry policy is honored.

Source

pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()>

Fail an activity, honoring its retry policy.

If attempt < max_attempts, the activity is re-queued with exponential backoff (initial_interval_secs * backoff_coefficient^(attempt-1)) and attempt is incremented. No event is appended — retries are an internal-engine concern, not workflow-visible.

If attempt >= max_attempts, the activity is permanently FAILED and an ActivityFailed event is appended so the workflow can react.

Source

pub async fn claim_workflow_task( &self, task_queue: &str, worker_id: &str, ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>>

Claim a dispatchable workflow task on a queue. Returns the workflow record + full event history so the worker can replay the handler deterministically. Atomic — multiple workers polling the same queue will each get a different task or None.

Source

pub async fn submit_workflow_commands( &self, workflow_id: &str, worker_id: &str, commands: &[Value], ) -> Result<()>

Submit a worker’s batch of commands for a workflow it claimed. Each command produces durable events / rows transactionally and the dispatch lease is released on return.

Supported command types:

  • ScheduleActivity { seq, name, task_queue, input?, max_attempts?, … }
  • CompleteWorkflow { result }
  • FailWorkflow { error }
Source

pub async fn schedule_timer( &self, workflow_id: &str, seq: i32, duration_secs: f64, ) -> Result<WorkflowTimer>

Schedule a durable timer for a workflow.

Idempotent on (workflow_id, seq) — a workflow that yields the same ScheduleTimer{seq=N} on retry will reuse the existing timer, not schedule a second one. This is the timer counterpart to schedule_activity’s replay-safe behaviour.

On the first call:

  • inserts a row in workflow_timers with fire_at = now + duration
  • appends a TimerScheduled event so the worker can replay and know it’s been scheduled (otherwise replays would yield it again)
Source

pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()>

Finalise a cancellation: flips status to CANCELLED and appends the terminal WorkflowCancelled event. Called by the CancelWorkflow command handler (worker acknowledged cancel) and by cancel_workflow directly when the workflow has no worker yet.

Source

pub async fn complete_workflow( &self, workflow_id: &str, result: Option<&str>, ) -> Result<()>

Mark a workflow COMPLETED with a result + append WorkflowCompleted event. If the workflow has a parent, also notifies the parent with a ChildWorkflowCompleted event and marks it dispatchable so it can replay past ctx:start_child_workflow and pick up the child’s result.

Source

pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()>

Mark a workflow FAILED with an error + append WorkflowFailed event. Notifies the parent if any (ChildWorkflowFailed).

Source

pub async fn heartbeat_activity( &self, id: i64, details: Option<&str>, ) -> Result<()>

Source

pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()>

Source

pub async fn list_schedules( &self, namespace: &str, ) -> Result<Vec<WorkflowSchedule>>

Source

pub async fn get_schedule( &self, namespace: &str, name: &str, ) -> Result<Option<WorkflowSchedule>>

Source

pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool>

Source

pub async fn update_schedule( &self, namespace: &str, name: &str, patch: &SchedulePatch, ) -> Result<Option<WorkflowSchedule>>

Source

pub async fn set_schedule_paused( &self, namespace: &str, name: &str, paused: bool, ) -> Result<Option<WorkflowSchedule>>

Source

pub async fn create_namespace(&self, name: &str) -> Result<()>

Source

pub async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>>

Source

pub async fn delete_namespace(&self, name: &str) -> Result<bool>

Source

pub async fn get_namespace_stats( &self, namespace: &str, ) -> Result<NamespaceStats>

Source

pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>>

Source

pub async fn start_child_workflow( &self, namespace: &str, parent_id: &str, workflow_type: &str, workflow_id: &str, input: Option<&str>, task_queue: &str, ) -> Result<WorkflowRecord>

Source

pub async fn list_child_workflows( &self, parent_id: &str, ) -> Result<Vec<WorkflowRecord>>

Source

pub async fn continue_as_new( &self, workflow_id: &str, input: Option<&str>, ) -> Result<WorkflowRecord>

Source

pub async fn create_snapshot( &self, workflow_id: &str, event_seq: i32, state_json: &str, ) -> Result<()>

Source

pub async fn get_latest_snapshot( &self, workflow_id: &str, ) -> Result<Option<WorkflowSnapshot>>

Source

pub async fn record_side_effect( &self, workflow_id: &str, value: &str, ) -> Result<()>

Auto Trait Implementations§

§

impl<S> Freeze for Engine<S>

§

impl<S> RefUnwindSafe for Engine<S>
where S: RefUnwindSafe,

§

impl<S> Send for Engine<S>

§

impl<S> Sync for Engine<S>

§

impl<S> Unpin for Engine<S>

§

impl<S> UnsafeUnpin for Engine<S>

§

impl<S> UnwindSafe for Engine<S>
where S: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,