Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14// Re-export so macro-generated code can reference `durable::inventory::submit!`
15pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22use uuid::Uuid;
23
24/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
25/// so heartbeat-based recovery can find them after a crash.
26static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
27
28/// Returns the executor ID set by [`init`], or `None` if `init` was not called.
29pub fn executor_id() -> Option<String> {
30    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
31}
32
33// ── Workflow auto-registration ──────────────────────────────────
34
35/// A compiled-in registration of a workflow function for automatic crash recovery.
36///
37/// Produced by `#[durable::workflow]` and collected at link time by `inventory`.
38/// Only workflows with a single `ctx: Ctx` parameter are registered.
39pub struct WorkflowRegistration {
40    /// The handler name — matches the function name, used for recovery lookup
41    /// via the `handler` column in `durable.task`.
42    pub name: &'static str,
43    /// Resumes the workflow given a `Ctx`. Return type is erased; during
44    /// recovery we only care about driving the workflow to completion.
45    pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
46}
47
48inventory::collect!(WorkflowRegistration);
49
50/// Look up a registered workflow by name.
51pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
52    inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
53}
54
55// ── Initialization ──────────────────────────────────────────────
56
57/// Resume a failed workflow from its last completed step (DBOS-style).
58///
59/// Resets the workflow from FAILED → RUNNING, resets failed child steps to
60/// PENDING, and re-dispatches the registered workflow function. Completed
61/// steps replay from their checkpointed outputs.
62///
63/// If no `WorkflowRegistration` is found for the workflow's handler, the
64/// workflow is still reset but not auto-dispatched — use `Ctx::from_id()`
65/// to drive it manually.
66///
67/// ```ignore
68/// durable::resume_workflow(&db, workflow_id).await?;
69/// ```
70pub async fn resume_workflow(
71    db: &DatabaseConnection,
72    task_id: uuid::Uuid,
73) -> Result<(), DurableError> {
74    let handler = Ctx::resume_failed(db, task_id).await?;
75
76    // Look up registered handler and auto-dispatch with recovery
77    let lookup_key = handler.as_deref().unwrap_or("");
78    if let Some(reg) = find_workflow(lookup_key) {
79        let db_inner = db.clone();
80        let resume = reg.resume_fn;
81        tokio::spawn(async move {
82            tracing::info!(
83                id = %task_id,
84                "re-dispatching resumed workflow"
85            );
86            run_workflow_with_recovery(db_inner, task_id, resume).await;
87        });
88    } else {
89        tracing::warn!(
90            id = %task_id,
91            handler = ?handler,
92            "no registered handler for resumed workflow — use Ctx::from_id() to resume manually"
93        );
94    }
95
96    Ok(())
97}
98
99/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
100/// recover stale tasks, and auto-resume registered workflows.
101///
102/// After this call, [`Ctx::start`] automatically tags tasks with the executor
103/// ID for crash recovery. Recovered root workflows that have a matching
104/// `#[durable::workflow]` registration are automatically spawned as tokio
105/// tasks.
106///
107/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
108/// For custom intervals use [`init_with_config`].
109///
110/// ```ignore
111/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
112/// ```
113pub async fn init(
114    database_url: &str,
115) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
116    init_with_config(database_url, HeartbeatConfig::default()).await
117}
118
119/// Like [`init`] but with a custom [`HeartbeatConfig`].
120pub async fn init_with_config(
121    database_url: &str,
122    config: HeartbeatConfig,
123) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
124    let mut opt = ConnectOptions::new(database_url);
125    opt.set_schema_search_path("public,durable");
126    let db = Database::connect(opt).await?;
127
128    // Run migrations
129    durable_db::Migrator::up(&db, None).await?;
130
131    // Create executor with a unique ID for this process
132    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
133    if let Ok(mut guard) = EXECUTOR_ID.write() {
134        *guard = Some(eid.clone());
135    }
136    let executor = Executor::new(db.clone(), eid);
137
138    // Write initial heartbeat so other workers know we're alive
139    executor.heartbeat().await?;
140
141    let mut all_recovered = Vec::new();
142
143    // Recover stale tasks: timeout/deadline-based
144    let recovered = executor.recover().await?;
145    if !recovered.is_empty() {
146        tracing::info!(
147            "recovered {} stale tasks (timeout/deadline)",
148            recovered.len()
149        );
150    }
151    all_recovered.extend(recovered);
152
153    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
154    let recovered = executor
155        .recover_stale_tasks(config.staleness_threshold)
156        .await?;
157    if !recovered.is_empty() {
158        tracing::info!(
159            "recovered {} stale tasks from dead/unknown workers",
160            recovered.len()
161        );
162    }
163    all_recovered.extend(recovered);
164
165    // Reset orphaned RUNNING steps of recovered workflows back to PENDING
166    if !all_recovered.is_empty() {
167        let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
168        match executor.reset_orphaned_steps(&recovered_ids).await {
169            Ok(n) if n > 0 => {
170                tracing::info!("reset {n} orphaned steps to PENDING");
171            }
172            Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
173            _ => {}
174        }
175    }
176
177    // Auto-dispatch registered workflows
178    dispatch_recovered(&db, &all_recovered);
179
180    // Start background heartbeat loop
181    executor.start_heartbeat(&config);
182
183    // Start recovery loop that also auto-dispatches
184    start_recovery_dispatch_loop(
185        db.clone(),
186        executor.executor_id().to_string(),
187        config.staleness_threshold,
188    );
189
190    tracing::info!("durable initialized (executor={})", executor.executor_id());
191    Ok((db, all_recovered))
192}
193
194/// Default backoff between automatic workflow recovery attempts.
195const RECOVERY_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
196
197/// Run a workflow with automatic recovery on failure (DBOS-style).
198///
199/// When the workflow function returns `Err`:
200/// 1. Mark the workflow FAILED
201/// 2. If `recovery_count < max_recovery_attempts`, reset and re-execute
202/// 3. If recovery limit exceeded, leave it FAILED
203///
204/// Completed steps replay from checkpoints on each recovery attempt.
205async fn run_workflow_with_recovery(
206    db: DatabaseConnection,
207    task_id: Uuid,
208    resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
209) {
210    loop {
211        match Ctx::from_id(&db, task_id).await {
212            Ok(ctx) => {
213                // Run the workflow, catching both Err returns and panics (.unwrap(), etc.)
214                let result = tokio::task::spawn(async move { (resume_fn)(ctx).await }).await;
215
216                let err_msg = match result {
217                    Ok(Ok(())) => return, // workflow completed successfully
218                    Ok(Err(e)) => e.to_string(),
219                    Err(join_err) => {
220                        // Panic or cancellation inside the spawned task
221                        if join_err.is_panic() {
222                            let panic_msg = match join_err.into_panic().downcast::<String>() {
223                                Ok(msg) => *msg,
224                                Err(payload) => match payload.downcast::<&str>() {
225                                    Ok(msg) => msg.to_string(),
226                                    Err(_) => "unknown panic".to_string(),
227                                },
228                            };
229                            format!("panic: {panic_msg}")
230                        } else {
231                            "task cancelled".to_string()
232                        }
233                    }
234                };
235
236                // Mark workflow as FAILED
237                if let Err(fail_err) =
238                    Ctx::fail_by_id(&db, task_id, &err_msg).await
239                {
240                    tracing::error!(
241                        id = %task_id,
242                        error = %fail_err,
243                        "failed to mark workflow as FAILED"
244                    );
245                    return;
246                }
247
248                tracing::warn!(
249                    id = %task_id,
250                    error = %err_msg,
251                    "workflow failed, attempting automatic recovery"
252                );
253
254                // Attempt auto-recovery
255                match Ctx::resume_failed(&db, task_id).await {
256                    Ok(_) => {
257                        tracing::info!(
258                            id = %task_id,
259                            "workflow auto-recovery: reset succeeded, re-executing"
260                        );
261                        tokio::time::sleep(RECOVERY_BACKOFF).await;
262                        continue; // re-execute the workflow
263                    }
264                    Err(DurableError::MaxRecoveryExceeded(_)) => {
265                        tracing::error!(
266                            id = %task_id,
267                            "workflow exceeded max recovery attempts, staying FAILED"
268                        );
269                        return;
270                    }
271                    Err(recover_err) => {
272                        tracing::error!(
273                            id = %task_id,
274                            error = %recover_err,
275                            "workflow auto-recovery failed"
276                        );
277                        return;
278                    }
279                }
280            }
281            Err(e) => {
282                tracing::error!(
283                    id = %task_id,
284                    error = %e,
285                    "failed to attach to workflow"
286                );
287                return;
288            }
289        }
290    }
291}
292
293/// Spawn registered workflow functions for recovered root tasks.
294/// Tasks are already RUNNING (claimed atomically by the recovery SQL).
295fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
296    for task in recovered {
297        // Only auto-dispatch root workflows (children are driven by their parent)
298        if task.parent_id.is_some() {
299            continue;
300        }
301
302        let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
303        if let Some(reg) = find_workflow(lookup_key) {
304            let db_inner = db.clone();
305            let task_id = task.id;
306            let task_name = task.name.clone();
307            let resume = reg.resume_fn;
308            tokio::spawn(async move {
309                tracing::info!(
310                    workflow = %task_name,
311                    id = %task_id,
312                    "auto-resuming recovered workflow"
313                );
314                run_workflow_with_recovery(db_inner, task_id, resume).await;
315            });
316        } else {
317            tracing::warn!(
318                workflow = %task.name,
319                handler = ?task.handler,
320                id = %task.id,
321                "no registered handler for recovered task — use Ctx::from_id() to resume manually"
322            );
323        }
324    }
325}
326
327/// Spawn a background loop that recovers stale tasks AND auto-dispatches them.
328fn start_recovery_dispatch_loop(
329    db: DatabaseConnection,
330    executor_id: String,
331    staleness_threshold: std::time::Duration,
332) {
333    tokio::spawn(async move {
334        let executor = Executor::new(db.clone(), executor_id);
335        let mut ticker = tokio::time::interval(staleness_threshold);
336        loop {
337            ticker.tick().await;
338
339            // Collect all recovered workflow IDs so we can reset their orphaned steps.
340            let mut all_recovered_ids = Vec::new();
341
342            // Timeout-based recovery
343            match executor.recover().await {
344                Ok(ref recovered) if !recovered.is_empty() => {
345                    tracing::info!(
346                        "recovered {} stale tasks (timeout/deadline)",
347                        recovered.len()
348                    );
349                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
350                    dispatch_recovered(&db, recovered);
351                }
352                Err(e) => tracing::warn!("timeout recovery failed: {e}"),
353                _ => {}
354            }
355
356            // Heartbeat-based recovery
357            match executor.recover_stale_tasks(staleness_threshold).await {
358                Ok(ref recovered) if !recovered.is_empty() => {
359                    tracing::info!(
360                        "recovered {} stale tasks from dead workers",
361                        recovered.len()
362                    );
363                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
364                    dispatch_recovered(&db, recovered);
365                }
366                Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
367                _ => {}
368            }
369
370            // Reset orphaned RUNNING steps of recovered workflows back to
371            // PENDING so they can be re-executed. Without this, steps that
372            // committed TX1 (set RUNNING) but crashed before TX2 (checkpoint)
373            // would be permanently stuck.
374            if !all_recovered_ids.is_empty() {
375                match executor.reset_orphaned_steps(&all_recovered_ids).await {
376                    Ok(n) if n > 0 => {
377                        tracing::info!("reset {n} orphaned steps to PENDING");
378                    }
379                    Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
380                    _ => {}
381                }
382            }
383        }
384    });
385}
386
387/// Initialize durable: connect to Postgres and run migrations only.
388///
389/// Does **not** start heartbeat or recovery loops, and does **not** set the
390/// global executor ID. Use this for tests, migrations-only scripts, or when
391/// you manage the [`Executor`] yourself.
392///
393/// ```ignore
394/// let db = durable::init_db("postgres://localhost/mydb").await?;
395/// ```
396pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
397    let mut opt = ConnectOptions::new(database_url);
398    opt.set_schema_search_path("public,durable");
399    let db = Database::connect(opt).await?;
400    durable_db::Migrator::up(&db, None).await?;
401    tracing::info!("durable initialized (db only)");
402    Ok(db)
403}