pub struct Ctx { /* private fields */ }Expand description
Context threaded through every workflow and step.
Users never create or manage task IDs. The SDK handles everything
via (parent_id, name) lookups — the unique constraint in the schema
guarantees exactly-once step creation.
Implementations§
Source§impl Ctx
impl Ctx
Sourcepub async fn start(
db: &DatabaseConnection,
name: &str,
input: Option<Value>,
) -> Result<Self, DurableError>
pub async fn start( db: &DatabaseConnection, name: &str, input: Option<Value>, ) -> Result<Self, DurableError>
Start or resume a root workflow by name.
let ctx = Ctx::start(&db, "ingest", json!({"crawl": "CC-2026"})).await?;Sourcepub async fn step<T, F, Fut>(&self, name: &str, f: F) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned,
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, DurableError>>,
pub async fn step<T, F, Fut>(&self, name: &str, f: F) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned,
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, DurableError>>,
Run a step. If already completed, returns saved output. Otherwise executes the closure, saves the result, and returns it.
This method uses no retries (max_retries=0). For retries, use step_with_retry.
let count: u32 = ctx.step("fetch_count", || async { api.get_count().await }).await?;Sourcepub async fn transaction<T, F>(
&self,
name: &str,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned + Send,
F: for<'tx> FnOnce(&'tx DatabaseTransaction) -> Pin<Box<dyn Future<Output = Result<T, DurableError>> + Send + 'tx>> + Send,
pub async fn transaction<T, F>(
&self,
name: &str,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned + Send,
F: for<'tx> FnOnce(&'tx DatabaseTransaction) -> Pin<Box<dyn Future<Output = Result<T, DurableError>> + Send + 'tx>> + Send,
Run a DB-only step inside a single Postgres transaction.
Both the user’s DB work and the checkpoint save happen in the same transaction, ensuring atomicity. If the closure returns an error, both the user writes and the checkpoint are rolled back.
let count: u32 = ctx.transaction("upsert_batch", |tx| Box::pin(async move {
do_db_work(tx).await
})).await?;Sourcepub async fn child(
&self,
name: &str,
input: Option<Value>,
) -> Result<Self, DurableError>
pub async fn child( &self, name: &str, input: Option<Value>, ) -> Result<Self, DurableError>
Start or resume a child workflow. Returns a new Ctx scoped to the child.
let child_ctx = ctx.child("embed_batch", json!({"vectors": 1000})).await?;
// use child_ctx.step(...) for steps inside the child
child_ctx.complete(json!({"done": true})).await?;Sourcepub async fn is_completed(&self) -> Result<bool, DurableError>
pub async fn is_completed(&self) -> Result<bool, DurableError>
Check if this workflow/child was already completed (for skipping in parent).
Sourcepub async fn get_output<T: DeserializeOwned>(
&self,
) -> Result<Option<T>, DurableError>
pub async fn get_output<T: DeserializeOwned>( &self, ) -> Result<Option<T>, DurableError>
Get the saved output if this task is completed.
Sourcepub async fn complete<T: Serialize>(
&self,
output: &T,
) -> Result<(), DurableError>
pub async fn complete<T: Serialize>( &self, output: &T, ) -> Result<(), DurableError>
Mark this workflow as completed with an output value.
Sourcepub async fn step_with_retry<T, F, Fut>(
&self,
name: &str,
policy: RetryPolicy,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned,
F: Fn() -> Fut,
Fut: Future<Output = Result<T, DurableError>>,
pub async fn step_with_retry<T, F, Fut>(
&self,
name: &str,
policy: RetryPolicy,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned,
F: Fn() -> Fut,
Fut: Future<Output = Result<T, DurableError>>,
Run a step with a configurable retry policy.
Unlike step(), the closure must implement Fn (not FnOnce) since
it may be called multiple times on retry. Retries happen in-process with
configurable backoff between attempts.
let result: u32 = ctx
.step_with_retry("call_api", RetryPolicy::exponential(3, Duration::from_secs(1)), || async {
api.call().await
})
.await?;Sourcepub async fn fail(&self, error: &str) -> Result<(), DurableError>
pub async fn fail(&self, error: &str) -> Result<(), DurableError>
Mark this workflow as failed.
Sourcepub async fn set_timeout(&self, timeout_ms: i64) -> Result<(), DurableError>
pub async fn set_timeout(&self, timeout_ms: i64) -> Result<(), DurableError>
Set the timeout for this task in milliseconds.
If a task stays RUNNING longer than timeout_ms, it will be eligible
for recovery by Executor::recover().
If the task is currently RUNNING and has a started_at, this also
computes and stores deadline_epoch_ms = started_at_epoch_ms + timeout_ms.
Sourcepub async fn start_with_timeout(
db: &DatabaseConnection,
name: &str,
input: Option<Value>,
timeout_ms: i64,
) -> Result<Self, DurableError>
pub async fn start_with_timeout( db: &DatabaseConnection, name: &str, input: Option<Value>, timeout_ms: i64, ) -> Result<Self, DurableError>
Start or resume a root workflow with a timeout in milliseconds.
Equivalent to calling Ctx::start() followed by ctx.set_timeout(timeout_ms).
Sourcepub async fn pause(
db: &DatabaseConnection,
task_id: Uuid,
) -> Result<(), DurableError>
pub async fn pause( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>
Pause a workflow by ID. Sets status to PAUSED and recursively cascades to all PENDING/RUNNING descendants (children, grandchildren, etc.).
Only workflows in PENDING or RUNNING status can be paused.
Sourcepub async fn resume(
db: &DatabaseConnection,
task_id: Uuid,
) -> Result<(), DurableError>
pub async fn resume( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>
Resume a paused workflow by ID. Sets status back to RUNNING and recursively cascades to all PAUSED descendants (resetting them to PENDING).
Sourcepub async fn cancel(
db: &DatabaseConnection,
task_id: Uuid,
) -> Result<(), DurableError>
pub async fn cancel( db: &DatabaseConnection, task_id: Uuid, ) -> Result<(), DurableError>
Cancel a workflow by ID. Sets status to CANCELLED and recursively cascades to all non-terminal descendants.
Cancellation is terminal — a cancelled workflow cannot be resumed.
Sourcepub async fn list(
db: &DatabaseConnection,
query: TaskQuery,
) -> Result<Vec<TaskSummary>, DurableError>
pub async fn list( db: &DatabaseConnection, query: TaskQuery, ) -> Result<Vec<TaskSummary>, DurableError>
List tasks matching the given filter, with sorting and pagination.
let tasks = Ctx::list(&db, TaskQuery::default().status("RUNNING").limit(10)).await?;Sourcepub async fn count(
db: &DatabaseConnection,
query: TaskQuery,
) -> Result<u64, DurableError>
pub async fn count( db: &DatabaseConnection, query: TaskQuery, ) -> Result<u64, DurableError>
Count tasks matching the given filter.
pub fn db(&self) -> &DatabaseConnection
pub fn task_id(&self) -> Uuid
pub fn next_sequence(&self) -> i32
Auto Trait Implementations§
impl !Freeze for Ctx
impl !RefUnwindSafe for Ctx
impl Send for Ctx
impl Sync for Ctx
impl Unpin for Ctx
impl UnsafeUnpin for Ctx
impl !UnwindSafe for Ctx
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more