Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14// Re-export so macro-generated code can reference `durable::inventory::submit!`
15pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22use uuid::Uuid;
23
24/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
25/// so heartbeat-based recovery can find them after a crash.
26static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
27
28/// Returns the executor ID set by [`init`], or `None` if `init` was not called.
29pub fn executor_id() -> Option<String> {
30    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
31}
32
33// ── Workflow auto-registration ──────────────────────────────────
34
35/// A compiled-in registration of a workflow function for automatic crash recovery.
36///
37/// Produced by `#[durable::workflow]` and collected at link time by `inventory`.
38/// Only workflows with a single `ctx: Ctx` parameter are registered.
39pub struct WorkflowRegistration {
40    /// The handler name — matches the function name, used for recovery lookup
41    /// via the `handler` column in `durable.task`.
42    pub name: &'static str,
43    /// Resumes the workflow given a `Ctx`. Return type is erased; during
44    /// recovery we only care about driving the workflow to completion.
45    pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
46}
47
48inventory::collect!(WorkflowRegistration);
49
50/// Look up a registered workflow by name.
51pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
52    inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
53}
54
55// ── Initialization ──────────────────────────────────────────────
56
57/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
58/// recover stale tasks, and auto-resume registered workflows.
59///
60/// After this call, [`Ctx::start`] automatically tags tasks with the executor
61/// ID for crash recovery. Recovered root workflows that have a matching
62/// `#[durable::workflow]` registration are automatically spawned as tokio
63/// tasks.
64///
65/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
66/// For custom intervals use [`init_with_config`].
67///
68/// ```ignore
69/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
70/// ```
71pub async fn init(
72    database_url: &str,
73) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
74    init_with_config(database_url, HeartbeatConfig::default()).await
75}
76
77/// Like [`init`] but with a custom [`HeartbeatConfig`].
78pub async fn init_with_config(
79    database_url: &str,
80    config: HeartbeatConfig,
81) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
82    let mut opt = ConnectOptions::new(database_url);
83    opt.set_schema_search_path("public,durable");
84    let db = Database::connect(opt).await?;
85
86    // Run migrations
87    durable_db::Migrator::up(&db, None).await?;
88
89    // Create executor with a unique ID for this process
90    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
91    if let Ok(mut guard) = EXECUTOR_ID.write() {
92        *guard = Some(eid.clone());
93    }
94    let executor = Executor::new(db.clone(), eid);
95
96    // Write initial heartbeat so other workers know we're alive
97    executor.heartbeat().await?;
98
99    let mut all_recovered = Vec::new();
100
101    // Recover stale tasks: timeout/deadline-based
102    let recovered = executor.recover().await?;
103    if !recovered.is_empty() {
104        tracing::info!(
105            "recovered {} stale tasks (timeout/deadline)",
106            recovered.len()
107        );
108    }
109    all_recovered.extend(recovered);
110
111    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
112    let recovered = executor
113        .recover_stale_tasks(config.staleness_threshold)
114        .await?;
115    if !recovered.is_empty() {
116        tracing::info!(
117            "recovered {} stale tasks from dead/unknown workers",
118            recovered.len()
119        );
120    }
121    all_recovered.extend(recovered);
122
123    // Reset orphaned RUNNING steps of recovered workflows back to PENDING
124    if !all_recovered.is_empty() {
125        let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
126        match executor.reset_orphaned_steps(&recovered_ids).await {
127            Ok(n) if n > 0 => {
128                tracing::info!("reset {n} orphaned steps to PENDING");
129            }
130            Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
131            _ => {}
132        }
133    }
134
135    // Auto-dispatch registered workflows
136    dispatch_recovered(&db, &all_recovered);
137
138    // Start background heartbeat loop
139    executor.start_heartbeat(&config);
140
141    // Start recovery loop that also auto-dispatches
142    start_recovery_dispatch_loop(
143        db.clone(),
144        executor.executor_id().to_string(),
145        config.staleness_threshold,
146    );
147
148    tracing::info!("durable initialized (executor={})", executor.executor_id());
149    Ok((db, all_recovered))
150}
151
152/// Spawn registered workflow functions for recovered root tasks.
153/// Tasks are already RUNNING (claimed atomically by the recovery SQL).
154fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
155    for task in recovered {
156        // Only auto-dispatch root workflows (children are driven by their parent)
157        if task.parent_id.is_some() {
158            continue;
159        }
160
161        let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
162        if let Some(reg) = find_workflow(lookup_key) {
163            let db_inner = db.clone();
164            let task_id = task.id;
165            let task_name = task.name.clone();
166            let resume = reg.resume_fn;
167            tokio::spawn(async move {
168                tracing::info!(
169                    workflow = %task_name,
170                    id = %task_id,
171                    "auto-resuming recovered workflow"
172                );
173                match Ctx::from_id(&db_inner, task_id).await {
174                    Ok(ctx) => {
175                        if let Err(e) = (resume)(ctx).await {
176                            tracing::error!(
177                                workflow = %task_name,
178                                id = %task_id,
179                                error = %e,
180                                "recovered workflow failed"
181                            );
182                        }
183                    }
184                    Err(e) => {
185                        tracing::error!(
186                            workflow = %task_name,
187                            id = %task_id,
188                            error = %e,
189                            "failed to attach to recovered workflow"
190                        );
191                    }
192                }
193            });
194        } else {
195            tracing::warn!(
196                workflow = %task.name,
197                handler = ?task.handler,
198                id = %task.id,
199                "no registered handler for recovered task — use Ctx::from_id() to resume manually"
200            );
201        }
202    }
203}
204
205/// Spawn a background loop that recovers stale tasks AND auto-dispatches them.
206fn start_recovery_dispatch_loop(
207    db: DatabaseConnection,
208    executor_id: String,
209    staleness_threshold: std::time::Duration,
210) {
211    tokio::spawn(async move {
212        let executor = Executor::new(db.clone(), executor_id);
213        let mut ticker = tokio::time::interval(staleness_threshold);
214        loop {
215            ticker.tick().await;
216
217            // Collect all recovered workflow IDs so we can reset their orphaned steps.
218            let mut all_recovered_ids = Vec::new();
219
220            // Timeout-based recovery
221            match executor.recover().await {
222                Ok(ref recovered) if !recovered.is_empty() => {
223                    tracing::info!(
224                        "recovered {} stale tasks (timeout/deadline)",
225                        recovered.len()
226                    );
227                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
228                    dispatch_recovered(&db, recovered);
229                }
230                Err(e) => tracing::warn!("timeout recovery failed: {e}"),
231                _ => {}
232            }
233
234            // Heartbeat-based recovery
235            match executor.recover_stale_tasks(staleness_threshold).await {
236                Ok(ref recovered) if !recovered.is_empty() => {
237                    tracing::info!(
238                        "recovered {} stale tasks from dead workers",
239                        recovered.len()
240                    );
241                    all_recovered_ids.extend(recovered.iter().map(|r| r.id));
242                    dispatch_recovered(&db, recovered);
243                }
244                Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
245                _ => {}
246            }
247
248            // Reset orphaned RUNNING steps of recovered workflows back to
249            // PENDING so they can be re-executed. Without this, steps that
250            // committed TX1 (set RUNNING) but crashed before TX2 (checkpoint)
251            // would be permanently stuck.
252            if !all_recovered_ids.is_empty() {
253                match executor.reset_orphaned_steps(&all_recovered_ids).await {
254                    Ok(n) if n > 0 => {
255                        tracing::info!("reset {n} orphaned steps to PENDING");
256                    }
257                    Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
258                    _ => {}
259                }
260            }
261        }
262    });
263}
264
265/// Initialize durable: connect to Postgres and run migrations only.
266///
267/// Does **not** start heartbeat or recovery loops, and does **not** set the
268/// global executor ID. Use this for tests, migrations-only scripts, or when
269/// you manage the [`Executor`] yourself.
270///
271/// ```ignore
272/// let db = durable::init_db("postgres://localhost/mydb").await?;
273/// ```
274pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
275    let mut opt = ConnectOptions::new(database_url);
276    opt.set_schema_search_path("public,durable");
277    let db = Database::connect(opt).await?;
278    durable_db::Migrator::up(&db, None).await?;
279    tracing::info!("durable initialized (db only)");
280    Ok(db)
281}