Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, ConnectionTrait, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16use std::sync::RwLock;
17
18/// Global executor ID set by [`init`]. Read by [`Ctx::start`] to tag tasks
19/// so heartbeat-based recovery can find them after a crash.
20static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
21
22/// Returns the executor ID set by [`init`], or `None` if `init` was not called
23/// (e.g. when using `init_db` directly in tests).
24pub fn executor_id() -> Option<String> {
25    EXECUTOR_ID.read().ok().and_then(|g| g.clone())
26}
27
28/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
29/// and recover stale tasks from prior crashes.
30///
31/// After this call, [`Ctx::start`] automatically tags tasks with the executor
32/// ID for crash recovery. Recovered tasks from prior crashes are returned so
33/// the caller can resume them with [`Ctx::from_id`].
34///
35/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
36/// For custom intervals use [`init_with_config`].
37///
38/// ```ignore
39/// let (db, recovered) = durable::init("postgres://localhost/mydb").await?;
40///
41/// // Resume any interrupted workflows
42/// for task in &recovered {
43///     let ctx = Ctx::from_id(&db, task.id).await?;
44///     // re-run your workflow function with ctx...
45/// }
46///
47/// // Start new workflows — executor_id is set automatically
48/// let ctx = Ctx::start(&db, "my_workflow", None).await?;
49/// ```
50pub async fn init(
51    database_url: &str,
52) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
53    init_with_config(database_url, HeartbeatConfig::default()).await
54}
55
56/// Like [`init`] but with a custom [`HeartbeatConfig`].
57pub async fn init_with_config(
58    database_url: &str,
59    config: HeartbeatConfig,
60) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
61    let mut opt = ConnectOptions::new(database_url);
62    opt.set_schema_search_path("public,durable");
63    let db = Database::connect(opt).await?;
64
65    // Run migrations
66    durable_db::Migrator::up(&db, None).await?;
67
68    // Create executor with a unique ID for this process
69    let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
70    // Store globally so Ctx::start() can tag tasks automatically
71    if let Ok(mut guard) = EXECUTOR_ID.write() {
72        *guard = Some(eid.clone());
73    }
74    let executor = Executor::new(db.clone(), eid);
75
76    // Write initial heartbeat so other workers know we're alive
77    executor.heartbeat().await?;
78
79    let mut all_recovered = Vec::new();
80
81    // Recover stale tasks: timeout/deadline-based
82    let recovered = executor.recover().await?;
83    if !recovered.is_empty() {
84        tracing::info!(
85            "recovered {} stale tasks (timeout/deadline)",
86            recovered.len()
87        );
88    }
89    all_recovered.extend(recovered);
90
91    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
92    let recovered = executor
93        .recover_stale_tasks(config.staleness_threshold)
94        .await?;
95    if !recovered.is_empty() {
96        tracing::info!(
97            "recovered {} stale tasks from dead/unknown workers",
98            recovered.len()
99        );
100    }
101    all_recovered.extend(recovered);
102
103    // Set recovered tasks back to RUNNING and claim them
104    for task in &all_recovered {
105        let sql = format!(
106            "UPDATE durable.task SET status = 'RUNNING', started_at = now() \
107             WHERE id = '{}' AND status = 'PENDING'",
108            task.id
109        );
110        db.execute(sea_orm::Statement::from_string(
111            sea_orm::DbBackend::Postgres,
112            sql,
113        ))
114        .await?;
115    }
116
117    // Start background heartbeat + recovery loops
118    executor.start_heartbeat(&config);
119    executor.start_recovery_loop(&config);
120
121    tracing::info!("durable initialized (executor={})", executor.executor_id());
122    Ok((db, all_recovered))
123}
124
125/// Initialize durable: connect to Postgres and run migrations only.
126///
127/// Does **not** start heartbeat or recovery loops, and does **not** set the
128/// global executor ID. Use this for tests, migrations-only scripts, or when
129/// you manage the [`Executor`] yourself.
130///
131/// ```ignore
132/// let db = durable::init_db("postgres://localhost/mydb").await?;
133/// ```
134pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
135    let mut opt = ConnectOptions::new(database_url);
136    opt.set_schema_search_path("public,durable");
137    let db = Database::connect(opt).await?;
138    durable_db::Migrator::up(&db, None).await?;
139    tracing::info!("durable initialized (db only)");
140    Ok(db)
141}