Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4pub mod scheduler;
5
6pub use ctx::Ctx;
7pub use ctx::RetryPolicy;
8pub use ctx::StartResult;
9pub use ctx::{TaskQuery, TaskSort, TaskSummary};
10pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
11pub use durable_macros::{step, workflow};
12pub use error::DurableError;
13pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
14pub use scheduler::{SchedulerConfig, next_run};
15pub use sea_orm::DatabaseTransaction;
16
17// Re-export so macro-generated code can reference `durable::inventory::submit!`
18pub use inventory;
19
20use sea_orm::{ConnectOptions, Database, DatabaseConnection};
21use sea_orm_migration::MigratorTrait;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::RwLock;
25use uuid::Uuid;
26
27/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
28/// so heartbeat-based recovery can find them after a crash.
29static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
30
31/// Returns the executor ID set by [`init`], or `None` if `init` was not called.
32pub fn executor_id() -> Option<String> {
33    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
34}
35
36// ── Workflow auto-registration ──────────────────────────────────
37
38/// Type alias for a workflow resume function pointer.
39pub type ResumeFn = fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>;
40
41/// A compiled-in registration of a workflow function for automatic crash recovery.
42///
43/// Produced by `#[durable::workflow]` and collected at link time by `inventory`.
44/// Only workflows with a single `ctx: Ctx` parameter are registered.
45pub struct WorkflowRegistration {
46    /// The handler name — matches the function name, used for recovery lookup
47    /// via the `handler` column in `durable.task`.
48    pub name: &'static str,
49    /// Resumes the workflow given a `Ctx`. Return type is erased; during
50    /// recovery we only care about driving the workflow to completion.
51    pub resume_fn: ResumeFn,
52}
53
54inventory::collect!(WorkflowRegistration);
55
56/// Look up a registered workflow by name.
57pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
58    inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
59}
60
61// ── Initialization ──────────────────────────────────────────────
62
63/// Resume a failed workflow from its last completed step (DBOS-style).
64///
65/// Resets the workflow from FAILED → RUNNING, resets failed child steps to
66/// PENDING, and re-dispatches the registered workflow function. Completed
67/// steps replay from their checkpointed outputs.
68///
69/// If no `WorkflowRegistration` is found for the workflow's handler, the
70/// workflow is still reset but not auto-dispatched — use `Ctx::from_id()`
71/// to drive it manually.
72///
73/// ```ignore
74/// durable::resume_workflow(&db, workflow_id).await?;
75/// ```
76pub async fn resume_workflow(
77    db: &DatabaseConnection,
78    task_id: uuid::Uuid,
79) -> Result<(), DurableError> {
80    let handler = Ctx::resume_failed(db, task_id).await?;
81
82    // Look up registered handler and auto-dispatch with recovery
83    let lookup_key = handler.as_deref().unwrap_or("");
84    if let Some(reg) = find_workflow(lookup_key) {
85        let db_inner = db.clone();
86        let resume = reg.resume_fn;
87        tokio::spawn(async move {
88            tracing::info!(
89                id = %task_id,
90                "re-dispatching resumed workflow"
91            );
92            run_workflow_with_recovery(db_inner, task_id, resume).await;
93        });
94    } else {
95        tracing::warn!(
96            id = %task_id,
97            handler = ?handler,
98            "no registered handler for resumed workflow — use Ctx::from_id() to resume manually"
99        );
100    }
101
102    Ok(())
103}
104
105/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
106/// recover stale tasks, and auto-resume registered workflows.
107///
108/// After this call, [`Ctx::start`] automatically tags tasks with the executor
109/// ID for crash recovery. Recovered root workflows that have a matching
110/// `#[durable::workflow]` registration are automatically spawned as tokio
111/// tasks.
112///
113/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
114/// For custom intervals use [`init_with_config`].
115///
116/// ```ignore
117/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
118/// ```
119pub async fn init(
120    database_url: &str,
121) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
122    init_with_config(database_url, HeartbeatConfig::default()).await
123}
124
125/// Like [`init`] but with a custom [`HeartbeatConfig`].
126pub async fn init_with_config(
127    database_url: &str,
128    config: HeartbeatConfig,
129) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
130    let mut opt = ConnectOptions::new(database_url);
131    opt.set_schema_search_path("public,durable");
132    let db = Database::connect(opt).await?;
133
134    // Run migrations
135    durable_db::Migrator::up(&db, None).await?;
136
137    // Create executor with a unique ID for this process
138    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
139    if let Ok(mut guard) = EXECUTOR_ID.write() {
140        *guard = Some(eid.clone());
141    }
142    let executor = Executor::new(db.clone(), eid);
143
144    // Write initial heartbeat so other workers know we're alive
145    executor.heartbeat().await?;
146
147    let mut all_recovered = Vec::new();
148
149    // Recover stale tasks: timeout/deadline-based
150    let recovered = executor.recover().await?;
151    if !recovered.is_empty() {
152        tracing::info!(
153            "recovered {} stale tasks (timeout/deadline)",
154            recovered.len()
155        );
156    }
157    all_recovered.extend(recovered);
158
159    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
160    let recovered = executor
161        .recover_stale_tasks(config.staleness_threshold)
162        .await?;
163    if !recovered.is_empty() {
164        tracing::info!(
165            "recovered {} stale tasks from dead/unknown workers",
166            recovered.len()
167        );
168    }
169    all_recovered.extend(recovered);
170
171    // Reset orphaned RUNNING steps of recovered workflows back to PENDING
172    if !all_recovered.is_empty() {
173        let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
174        match executor.reset_orphaned_steps(&recovered_ids).await {
175            Ok(n) if n > 0 => {
176                tracing::info!("reset {n} orphaned steps to PENDING");
177            }
178            Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
179            _ => {}
180        }
181    }
182
183    // Auto-dispatch registered workflows
184    dispatch_recovered(&db, &all_recovered);
185
186    // Start background heartbeat loop — runs for the process lifetime.
187    // The JoinHandle is intentionally dropped; the spawned task is detached.
188    // Panics inside the loop are caught per-tick (see Executor::start_heartbeat).
189    drop(executor.start_heartbeat(&config));
190
191    // Start recovery loop that also auto-dispatches
192    start_recovery_dispatch_loop(
193        db.clone(),
194        executor.executor_id().to_string(),
195        config.staleness_threshold,
196    );
197
198    // Start cron scheduler loop
199    scheduler::start_scheduler_loop(
200        db.clone(),
201        executor.executor_id().to_string(),
202        &SchedulerConfig::default(),
203    );
204
205    tracing::info!("durable initialized (executor={})", executor.executor_id());
206    Ok((db, all_recovered))
207}
208
209/// Default backoff between automatic workflow recovery attempts.
210const RECOVERY_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
211
212/// Run a workflow with automatic recovery on failure (DBOS-style).
213///
214/// When the workflow function returns `Err`:
215/// 1. Mark the workflow FAILED
216/// 2. If `recovery_count < max_recovery_attempts`, reset and re-execute
217/// 3. If recovery limit exceeded, leave it FAILED
218///
219/// Completed steps replay from checkpoints on each recovery attempt.
220async fn run_workflow_with_recovery(db: DatabaseConnection, task_id: Uuid, resume_fn: ResumeFn) {
221    loop {
222        match Ctx::from_id(&db, task_id).await {
223            Ok(ctx) => {
224                // Run the workflow, catching both Err returns and panics (.unwrap(), etc.)
225                let result = tokio::task::spawn(async move { (resume_fn)(ctx).await }).await;
226
227                let err_msg = match result {
228                    Ok(Ok(())) => return, // workflow completed successfully
229                    Ok(Err(e)) => e.to_string(),
230                    Err(join_err) => {
231                        // Panic or cancellation inside the spawned task
232                        if join_err.is_panic() {
233                            let panic_msg = match join_err.into_panic().downcast::<String>() {
234                                Ok(msg) => *msg,
235                                Err(payload) => match payload.downcast::<&str>() {
236                                    Ok(msg) => msg.to_string(),
237                                    Err(_) => "unknown panic".to_string(),
238                                },
239                            };
240                            format!("panic: {panic_msg}")
241                        } else {
242                            "task cancelled".to_string()
243                        }
244                    }
245                };
246
247                // Mark workflow as FAILED
248                if let Err(fail_err) = Ctx::fail_by_id(&db, task_id, &err_msg).await {
249                    tracing::error!(
250                        id = %task_id,
251                        error = %fail_err,
252                        "failed to mark workflow as FAILED"
253                    );
254                    return;
255                }
256
257                tracing::warn!(
258                    id = %task_id,
259                    error = %err_msg,
260                    "workflow failed, attempting automatic recovery"
261                );
262
263                // Attempt auto-recovery
264                match Ctx::resume_failed(&db, task_id).await {
265                    Ok(_) => {
266                        tracing::info!(
267                            id = %task_id,
268                            "workflow auto-recovery: reset succeeded, re-executing"
269                        );
270                        tokio::time::sleep(RECOVERY_BACKOFF).await;
271                        continue; // re-execute the workflow
272                    }
273                    Err(DurableError::MaxRecoveryExceeded(_)) => {
274                        tracing::error!(
275                            id = %task_id,
276                            "workflow exceeded max recovery attempts, staying FAILED"
277                        );
278                        return;
279                    }
280                    Err(recover_err) => {
281                        tracing::error!(
282                            id = %task_id,
283                            error = %recover_err,
284                            "workflow auto-recovery failed"
285                        );
286                        return;
287                    }
288                }
289            }
290            Err(e) => {
291                tracing::error!(
292                    id = %task_id,
293                    error = %e,
294                    "failed to attach to workflow"
295                );
296                return;
297            }
298        }
299    }
300}
301
302/// Spawn registered workflow functions for recovered root tasks.
303/// Tasks are already RUNNING (claimed atomically by the recovery SQL).
304fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
305    for task in recovered {
306        // Only auto-dispatch root workflows (children are driven by their parent)
307        if task.parent_id.is_some() {
308            continue;
309        }
310
311        let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
312        if let Some(reg) = find_workflow(lookup_key) {
313            let db_inner = db.clone();
314            let task_id = task.id;
315            let task_name = task.name.clone();
316            let resume = reg.resume_fn;
317            tokio::spawn(async move {
318                tracing::info!(
319                    workflow = %task_name,
320                    id = %task_id,
321                    "auto-resuming recovered workflow"
322                );
323                run_workflow_with_recovery(db_inner, task_id, resume).await;
324            });
325        } else {
326            tracing::warn!(
327                workflow = %task.name,
328                handler = ?task.handler,
329                id = %task.id,
330                "no registered handler for recovered task — use Ctx::from_id() to resume manually"
331            );
332        }
333    }
334}
335
336/// Spawn a background loop that recovers stale tasks AND auto-dispatches them.
337fn start_recovery_dispatch_loop(
338    db: DatabaseConnection,
339    executor_id: String,
340    staleness_threshold: std::time::Duration,
341) {
342    tokio::spawn(async move {
343        let executor = Executor::new(db.clone(), executor_id);
344        let mut ticker = tokio::time::interval(staleness_threshold);
345        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
346        loop {
347            ticker.tick().await;
348
349            // Collect all recovered workflow IDs so we can reset their orphaned steps.
350            let mut all_recovered_ids = Vec::new();
351
352            // Timeout-based recovery
353            match executor.recover().await {
354                Ok(ref recovered) if !recovered.is_empty() => {
355                    tracing::info!(
356                        "recovered {} stale tasks (timeout/deadline)",
357                        recovered.len()
358                    );
359                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
360                    dispatch_recovered(&db, recovered);
361                }
362                Err(e) => tracing::warn!("timeout recovery failed: {e}"),
363                _ => {}
364            }
365
366            // Heartbeat-based recovery
367            match executor.recover_stale_tasks(staleness_threshold).await {
368                Ok(ref recovered) if !recovered.is_empty() => {
369                    tracing::info!(
370                        "recovered {} stale tasks from dead workers",
371                        recovered.len()
372                    );
373                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
374                    dispatch_recovered(&db, recovered);
375                }
376                Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
377                _ => {}
378            }
379
380            // Reset orphaned RUNNING steps of recovered workflows back to
381            // PENDING so they can be re-executed. Without this, steps that
382            // committed TX1 (set RUNNING) but crashed before TX2 (checkpoint)
383            // would be permanently stuck.
384            if !all_recovered_ids.is_empty() {
385                match executor.reset_orphaned_steps(&all_recovered_ids).await {
386                    Ok(n) if n > 0 => {
387                        tracing::info!("reset {n} orphaned steps to PENDING");
388                    }
389                    Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
390                    _ => {}
391                }
392            }
393        }
394    });
395}
396
397/// Initialize durable: connect to Postgres and run migrations only.
398///
399/// Does **not** start heartbeat or recovery loops, and does **not** set the
400/// global executor ID. Use this for tests, migrations-only scripts, or when
401/// you manage the [`Executor`] yourself.
402///
403/// ```ignore
404/// let db = durable::init_db("postgres://localhost/mydb").await?;
405/// ```
406pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
407    let mut opt = ConnectOptions::new(database_url);
408    opt.set_schema_search_path("public,durable");
409    let db = Database::connect(opt).await?;
410    durable_db::Migrator::up(&db, None).await?;
411    tracing::info!("durable initialized (db only)");
412    Ok(db)
413}