Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14// Re-export so macro-generated code can reference `durable::inventory::submit!`
15pub use inventory;
16
17use sea_orm::{ConnectOptions, ConnectionTrait, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22
23/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
24/// so heartbeat-based recovery can find them after a crash.
25static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
26
27/// Returns the executor ID set by [`init`], or `None` if `init` was not called.
28pub fn executor_id() -> Option<String> {
29    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
30}
31
32// ── Workflow auto-registration ──────────────────────────────────
33
34/// A compiled-in registration of a workflow function for automatic crash recovery.
35///
36/// Produced by `#[durable::workflow]` and collected at link time by `inventory`.
37/// Only workflows with a single `ctx: Ctx` parameter are registered.
38pub struct WorkflowRegistration {
39    /// The workflow name — must match the `name` passed to `Ctx::start`.
40    pub name: &'static str,
41    /// Resumes the workflow given a `Ctx`. Return type is erased; during
42    /// recovery we only care about driving the workflow to completion.
43    pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
44}
45
46inventory::collect!(WorkflowRegistration);
47
48/// Look up a registered workflow by name.
49pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
50    inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
51}
52
53// ── Initialization ──────────────────────────────────────────────
54
55/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
56/// recover stale tasks, and auto-resume registered workflows.
57///
58/// After this call, [`Ctx::start`] automatically tags tasks with the executor
59/// ID for crash recovery. Recovered root workflows that have a matching
60/// `#[durable::workflow]` registration are automatically spawned as tokio
61/// tasks.
62///
63/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
64/// For custom intervals use [`init_with_config`].
65///
66/// ```ignore
67/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
68/// ```
69pub async fn init(
70    database_url: &str,
71) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
72    init_with_config(database_url, HeartbeatConfig::default()).await
73}
74
75/// Like [`init`] but with a custom [`HeartbeatConfig`].
76pub async fn init_with_config(
77    database_url: &str,
78    config: HeartbeatConfig,
79) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
80    let mut opt = ConnectOptions::new(database_url);
81    opt.set_schema_search_path("public,durable");
82    let db = Database::connect(opt).await?;
83
84    // Run migrations
85    durable_db::Migrator::up(&db, None).await?;
86
87    // Create executor with a unique ID for this process
88    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
89    if let Ok(mut guard) = EXECUTOR_ID.write() {
90        *guard = Some(eid.clone());
91    }
92    let executor = Executor::new(db.clone(), eid);
93
94    // Write initial heartbeat so other workers know we're alive
95    executor.heartbeat().await?;
96
97    let mut all_recovered = Vec::new();
98
99    // Recover stale tasks: timeout/deadline-based
100    let recovered = executor.recover().await?;
101    if !recovered.is_empty() {
102        tracing::info!(
103            "recovered {} stale tasks (timeout/deadline)",
104            recovered.len()
105        );
106    }
107    all_recovered.extend(recovered);
108
109    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
110    let recovered = executor
111        .recover_stale_tasks(config.staleness_threshold)
112        .await?;
113    if !recovered.is_empty() {
114        tracing::info!(
115            "recovered {} stale tasks from dead/unknown workers",
116            recovered.len()
117        );
118    }
119    all_recovered.extend(recovered);
120
121    // Set recovered tasks back to RUNNING and auto-dispatch registered workflows
122    dispatch_recovered(&db, &all_recovered).await;
123
124    // Start background heartbeat loop
125    executor.start_heartbeat(&config);
126
127    // Start recovery loop that also auto-dispatches
128    start_recovery_dispatch_loop(
129        db.clone(),
130        executor.executor_id().to_string(),
131        config.staleness_threshold,
132    );
133
134    tracing::info!("durable initialized (executor={})", executor.executor_id());
135    Ok((db, all_recovered))
136}
137
138/// Set recovered tasks to RUNNING and spawn registered workflow functions.
139async fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
140    for task in recovered {
141        // Set back to RUNNING
142        let sql = format!(
143            "UPDATE durable.task SET status = 'RUNNING', started_at = now() \
144             WHERE id = '{}' AND status = 'PENDING'",
145            task.id
146        );
147        if let Err(e) = db
148            .execute(sea_orm::Statement::from_string(
149                sea_orm::DbBackend::Postgres,
150                sql,
151            ))
152            .await
153        {
154            tracing::error!(id = %task.id, error = %e, "failed to set recovered task to RUNNING");
155            continue;
156        }
157
158        // Only auto-dispatch root workflows (children are driven by their parent)
159        if task.parent_id.is_some() {
160            continue;
161        }
162
163        if let Some(reg) = find_workflow(&task.name) {
164            let db_inner = db.clone();
165            let task_id = task.id;
166            let task_name = task.name.clone();
167            let resume = reg.resume_fn;
168            tokio::spawn(async move {
169                tracing::info!(
170                    workflow = %task_name,
171                    id = %task_id,
172                    "auto-resuming recovered workflow"
173                );
174                match Ctx::from_id(&db_inner, task_id).await {
175                    Ok(ctx) => {
176                        if let Err(e) = (resume)(ctx).await {
177                            tracing::error!(
178                                workflow = %task_name,
179                                id = %task_id,
180                                error = %e,
181                                "recovered workflow failed"
182                            );
183                        }
184                    }
185                    Err(e) => {
186                        tracing::error!(
187                            workflow = %task_name,
188                            id = %task_id,
189                            error = %e,
190                            "failed to attach to recovered workflow"
191                        );
192                    }
193                }
194            });
195        } else {
196            tracing::warn!(
197                workflow = %task.name,
198                id = %task.id,
199                "no registered handler for recovered task — use Ctx::from_id() to resume manually"
200            );
201        }
202    }
203}
204
205/// Spawn a background loop that recovers stale tasks AND auto-dispatches them.
206fn start_recovery_dispatch_loop(
207    db: DatabaseConnection,
208    executor_id: String,
209    staleness_threshold: std::time::Duration,
210) {
211    tokio::spawn(async move {
212        let executor = Executor::new(db.clone(), executor_id);
213        let mut ticker = tokio::time::interval(staleness_threshold);
214        loop {
215            ticker.tick().await;
216
217            // Timeout-based recovery
218            match executor.recover().await {
219                Ok(ref recovered) if !recovered.is_empty() => {
220                    tracing::info!(
221                        "recovered {} stale tasks (timeout/deadline)",
222                        recovered.len()
223                    );
224                    dispatch_recovered(&db, recovered).await;
225                }
226                Err(e) => tracing::warn!("timeout recovery failed: {e}"),
227                _ => {}
228            }
229
230            // Heartbeat-based recovery
231            match executor.recover_stale_tasks(staleness_threshold).await {
232                Ok(ref recovered) if !recovered.is_empty() => {
233                    tracing::info!(
234                        "recovered {} stale tasks from dead workers",
235                        recovered.len()
236                    );
237                    dispatch_recovered(&db, recovered).await;
238                }
239                Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
240                _ => {}
241            }
242        }
243    });
244}
245
246/// Initialize durable: connect to Postgres and run migrations only.
247///
248/// Does **not** start heartbeat or recovery loops, and does **not** set the
249/// global executor ID. Use this for tests, migrations-only scripts, or when
250/// you manage the [`Executor`] yourself.
251///
252/// ```ignore
253/// let db = durable::init_db("postgres://localhost/mydb").await?;
254/// ```
255pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
256    let mut opt = ConnectOptions::new(database_url);
257    opt.set_schema_search_path("public,durable");
258    let db = Database::connect(opt).await?;
259    durable_db::Migrator::up(&db, None).await?;
260    tracing::info!("durable initialized (db only)");
261    Ok(db)
262}