Skip to main content

Ctx

Struct Ctx 

Source
pub struct Ctx { /* private fields */ }
Expand description

Context threaded through every workflow and step.

Users never create or manage task IDs. The SDK handles everything via (parent_id, name) lookups — the unique constraint in the schema guarantees exactly-once step creation.

Implementations§

Source§

impl Ctx

Source

pub async fn start( db: &DatabaseConnection, name: &str, input: Option<Value>, ) -> Result<Self, DurableError>

Start or resume a root workflow by name.

let ctx = Ctx::start(&db, "ingest", json!({"crawl": "CC-2026"})).await?;
Source

pub async fn step<T, F, Fut>(&self, name: &str, f: F) -> Result<T, DurableError>
where T: Serialize + DeserializeOwned, F: FnOnce() -> Fut, Fut: Future<Output = Result<T, DurableError>>,

Run a step. If already completed, returns saved output. Otherwise executes the closure, saves the result, and returns it.

This method uses no retries (max_retries=0). For retries, use step_with_retry.

let count: u32 = ctx.step("fetch_count", || async { api.get_count().await }).await?;
Source

pub async fn transaction<T, F>( &self, name: &str, f: F, ) -> Result<T, DurableError>
where T: Serialize + DeserializeOwned + Send, F: for<'tx> FnOnce(&'tx DatabaseTransaction) -> Pin<Box<dyn Future<Output = Result<T, DurableError>> + Send + 'tx>> + Send,

Run a DB-only step inside a single Postgres transaction.

Both the user’s DB work and the checkpoint save happen in the same transaction, ensuring atomicity. If the closure returns an error, both the user writes and the checkpoint are rolled back.

let count: u32 = ctx.transaction("upsert_batch", |tx| Box::pin(async move {
    do_db_work(tx).await
})).await?;
Source

pub async fn child( &self, name: &str, input: Option<Value>, ) -> Result<Self, DurableError>

Start or resume a child workflow. Returns a new Ctx scoped to the child.

let child_ctx = ctx.child("embed_batch", json!({"vectors": 1000})).await?;
// use child_ctx.step(...) for steps inside the child
child_ctx.complete(json!({"done": true})).await?;
Source

pub async fn is_completed(&self) -> Result<bool, DurableError>

Check if this workflow/child was already completed (for skipping in parent).

Source

pub async fn get_output<T: DeserializeOwned>( &self, ) -> Result<Option<T>, DurableError>

Get the saved output if this task is completed.

Source

pub async fn complete<T: Serialize>( &self, output: &T, ) -> Result<(), DurableError>

Mark this workflow as completed with an output value.

Source

pub async fn step_with_retry<T, F, Fut>( &self, name: &str, policy: RetryPolicy, f: F, ) -> Result<T, DurableError>
where T: Serialize + DeserializeOwned, F: Fn() -> Fut, Fut: Future<Output = Result<T, DurableError>>,

Run a step with a configurable retry policy.

Unlike step(), the closure must implement Fn (not FnOnce) since it may be called multiple times on retry. Retries happen in-process with configurable backoff between attempts.

let result: u32 = ctx
    .step_with_retry("call_api", RetryPolicy::exponential(3, Duration::from_secs(1)), || async {
        api.call().await
    })
    .await?;
Source

pub async fn fail(&self, error: &str) -> Result<(), DurableError>

Mark this workflow as failed.

Source

pub async fn set_timeout(&self, timeout_ms: i64) -> Result<(), DurableError>

Set the timeout for this task in milliseconds.

If a task stays RUNNING longer than timeout_ms, it will be eligible for recovery by Executor::recover().

If the task is currently RUNNING and has a started_at, this also computes and stores deadline_epoch_ms = started_at_epoch_ms + timeout_ms.

Source

pub async fn start_with_timeout( db: &DatabaseConnection, name: &str, input: Option<Value>, timeout_ms: i64, ) -> Result<Self, DurableError>

Start or resume a root workflow with a timeout in milliseconds.

Equivalent to calling Ctx::start() followed by ctx.set_timeout(timeout_ms).

Source

pub async fn pause( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>

Pause a workflow by ID. Sets status to PAUSED and recursively cascades to all PENDING/RUNNING descendants (children, grandchildren, etc.).

Only workflows in PENDING or RUNNING status can be paused.

Source

pub async fn resume( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>

Resume a paused workflow by ID. Sets status back to RUNNING and recursively cascades to all PAUSED descendants (resetting them to PENDING).

Source

pub async fn cancel( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>

Cancel a workflow by ID. Sets status to CANCELLED and recursively cascades to all non-terminal descendants.

Cancellation is terminal — a cancelled workflow cannot be resumed.

Source

pub async fn list( db: &DatabaseConnection, query: TaskQuery, ) -> Result<Vec<TaskSummary>, DurableError>

List tasks matching the given filter, with sorting and pagination.

let tasks = Ctx::list(&db, TaskQuery::default().status("RUNNING").limit(10)).await?;
Source

pub async fn count( db: &DatabaseConnection, query: TaskQuery, ) -> Result<u64, DurableError>

Count tasks matching the given filter.

Source

pub fn db(&self) -> &DatabaseConnection

Source

pub fn task_id(&self) -> Uuid

Source

pub fn next_sequence(&self) -> i32

Auto Trait Implementations§

§

impl !Freeze for Ctx

§

impl !RefUnwindSafe for Ctx

§

impl Send for Ctx

§

impl Sync for Ctx

§

impl Unpin for Ctx

§

impl UnsafeUnpin for Ctx

§

impl !UnwindSafe for Ctx

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more