Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14// Re-export so macro-generated code can reference `durable::inventory::submit!`
15pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22
23/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
24/// so heartbeat-based recovery can find them after a crash.
25static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
26
27/// Returns the executor ID set by [`init`], or `None` if `init` was not called.
28pub fn executor_id() -> Option<String> {
29    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
30}
31
32// ── Workflow auto-registration ──────────────────────────────────
33
34/// A compiled-in registration of a workflow function for automatic crash recovery.
35///
36/// Produced by `#[durable::workflow]` and collected at link time by `inventory`.
37/// Only workflows with a single `ctx: Ctx` parameter are registered.
38pub struct WorkflowRegistration {
39    /// The handler name — matches the function name, used for recovery lookup
40    /// via the `handler` column in `durable.task`.
41    pub name: &'static str,
42    /// Resumes the workflow given a `Ctx`. Return type is erased; during
43    /// recovery we only care about driving the workflow to completion.
44    pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
45}
46
47inventory::collect!(WorkflowRegistration);
48
49/// Look up a registered workflow by name.
50pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
51    inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
52}
53
54// ── Initialization ──────────────────────────────────────────────
55
56/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
57/// recover stale tasks, and auto-resume registered workflows.
58///
59/// After this call, [`Ctx::start`] automatically tags tasks with the executor
60/// ID for crash recovery. Recovered root workflows that have a matching
61/// `#[durable::workflow]` registration are automatically spawned as tokio
62/// tasks.
63///
64/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
65/// For custom intervals use [`init_with_config`].
66///
67/// ```ignore
68/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
69/// ```
70pub async fn init(
71    database_url: &str,
72) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
73    init_with_config(database_url, HeartbeatConfig::default()).await
74}
75
76/// Like [`init`] but with a custom [`HeartbeatConfig`].
77pub async fn init_with_config(
78    database_url: &str,
79    config: HeartbeatConfig,
80) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
81    let mut opt = ConnectOptions::new(database_url);
82    opt.set_schema_search_path("public,durable");
83    let db = Database::connect(opt).await?;
84
85    // Run migrations
86    durable_db::Migrator::up(&db, None).await?;
87
88    // Create executor with a unique ID for this process
89    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
90    if let Ok(mut guard) = EXECUTOR_ID.write() {
91        *guard = Some(eid.clone());
92    }
93    let executor = Executor::new(db.clone(), eid);
94
95    // Write initial heartbeat so other workers know we're alive
96    executor.heartbeat().await?;
97
98    let mut all_recovered = Vec::new();
99
100    // Recover stale tasks: timeout/deadline-based
101    let recovered = executor.recover().await?;
102    if !recovered.is_empty() {
103        tracing::info!(
104            "recovered {} stale tasks (timeout/deadline)",
105            recovered.len()
106        );
107    }
108    all_recovered.extend(recovered);
109
110    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
111    let recovered = executor
112        .recover_stale_tasks(config.staleness_threshold)
113        .await?;
114    if !recovered.is_empty() {
115        tracing::info!(
116            "recovered {} stale tasks from dead/unknown workers",
117            recovered.len()
118        );
119    }
120    all_recovered.extend(recovered);
121
122    // Set recovered tasks back to RUNNING and auto-dispatch registered workflows
123    dispatch_recovered(&db, &all_recovered);
124
125    // Start background heartbeat loop
126    executor.start_heartbeat(&config);
127
128    // Start recovery loop that also auto-dispatches
129    start_recovery_dispatch_loop(
130        db.clone(),
131        executor.executor_id().to_string(),
132        config.staleness_threshold,
133    );
134
135    tracing::info!("durable initialized (executor={})", executor.executor_id());
136    Ok((db, all_recovered))
137}
138
139/// Spawn registered workflow functions for recovered root tasks.
140/// Tasks are already RUNNING (claimed atomically by the recovery SQL).
141fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
142    for task in recovered {
143        // Only auto-dispatch root workflows (children are driven by their parent)
144        if task.parent_id.is_some() {
145            continue;
146        }
147
148        let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
149        if let Some(reg) = find_workflow(lookup_key) {
150            let db_inner = db.clone();
151            let task_id = task.id;
152            let task_name = task.name.clone();
153            let resume = reg.resume_fn;
154            tokio::spawn(async move {
155                tracing::info!(
156                    workflow = %task_name,
157                    id = %task_id,
158                    "auto-resuming recovered workflow"
159                );
160                match Ctx::from_id(&db_inner, task_id).await {
161                    Ok(ctx) => {
162                        if let Err(e) = (resume)(ctx).await {
163                            tracing::error!(
164                                workflow = %task_name,
165                                id = %task_id,
166                                error = %e,
167                                "recovered workflow failed"
168                            );
169                        }
170                    }
171                    Err(e) => {
172                        tracing::error!(
173                            workflow = %task_name,
174                            id = %task_id,
175                            error = %e,
176                            "failed to attach to recovered workflow"
177                        );
178                    }
179                }
180            });
181        } else {
182            tracing::warn!(
183                workflow = %task.name,
184                handler = ?task.handler,
185                id = %task.id,
186                "no registered handler for recovered task — use Ctx::from_id() to resume manually"
187            );
188        }
189    }
190}
191
192/// Spawn a background loop that recovers stale tasks AND auto-dispatches them.
193fn start_recovery_dispatch_loop(
194    db: DatabaseConnection,
195    executor_id: String,
196    staleness_threshold: std::time::Duration,
197) {
198    tokio::spawn(async move {
199        let executor = Executor::new(db.clone(), executor_id);
200        let mut ticker = tokio::time::interval(staleness_threshold);
201        loop {
202            ticker.tick().await;
203
204            // Timeout-based recovery
205            match executor.recover().await {
206                Ok(ref recovered) if !recovered.is_empty() => {
207                    tracing::info!(
208                        "recovered {} stale tasks (timeout/deadline)",
209                        recovered.len()
210                    );
211                    dispatch_recovered(&db, recovered);
212                }
213                Err(e) => tracing::warn!("timeout recovery failed: {e}"),
214                _ => {}
215            }
216
217            // Heartbeat-based recovery
218            match executor.recover_stale_tasks(staleness_threshold).await {
219                Ok(ref recovered) if !recovered.is_empty() => {
220                    tracing::info!(
221                        "recovered {} stale tasks from dead workers",
222                        recovered.len()
223                    );
224                    dispatch_recovered(&db, recovered);
225                }
226                Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
227                _ => {}
228            }
229        }
230    });
231}
232
233/// Initialize durable: connect to Postgres and run migrations only.
234///
235/// Does **not** start heartbeat or recovery loops, and does **not** set the
236/// global executor ID. Use this for tests, migrations-only scripts, or when
237/// you manage the [`Executor`] yourself.
238///
239/// ```ignore
240/// let db = durable::init_db("postgres://localhost/mydb").await?;
241/// ```
242pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
243    let mut opt = ConnectOptions::new(database_url);
244    opt.set_schema_search_path("public,durable");
245    let db = Database::connect(opt).await?;
246    durable_db::Migrator::up(&db, None).await?;
247    tracing::info!("durable initialized (db only)");
248    Ok(db)
249}