Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16
17/// Handle returned by [`init`] / [`init_with_config`].
18///
19/// Wraps a database connection, the executor identity, and any tasks
20/// recovered from prior crashes. Implements `Deref<Target = DatabaseConnection>`
21/// so it can be passed anywhere a `&DatabaseConnection` is expected.
22///
23/// ```ignore
24/// let durable = durable::init("postgres://localhost/mydb").await?;
25///
26/// // Use as &DatabaseConnection (via Deref)
27/// let ctx = Ctx::start(&durable, "my_wf", None).await?;
28///
29/// // Or start with executor tracking for proper crash recovery
30/// let ctx = durable.start_workflow("my_wf", None).await?;
31///
32/// // Inspect tasks recovered on startup
33/// for task in durable.recovered() {
34///     println!("recovered: {} ({})", task.name, task.id);
35/// }
36/// ```
37pub struct DurableInstance {
38    db: DatabaseConnection,
39    executor_id: String,
40    recovered: Vec<RecoveredTask>,
41}
42
43impl std::ops::Deref for DurableInstance {
44    type Target = DatabaseConnection;
45    fn deref(&self) -> &DatabaseConnection {
46        &self.db
47    }
48}
49
50impl DurableInstance {
51    /// The underlying database connection.
52    pub fn db(&self) -> &DatabaseConnection {
53        &self.db
54    }
55
56    /// The unique executor ID for this process.
57    pub fn executor_id(&self) -> &str {
58        &self.executor_id
59    }
60
61    /// Tasks recovered from prior crashes during [`init`].
62    ///
63    /// Each entry was reset from RUNNING to PENDING. To resume a recovered
64    /// workflow, call [`Ctx::from_id`] with the task's `id`.
65    pub fn recovered(&self) -> &[RecoveredTask] {
66        &self.recovered
67    }
68
69    /// Start a new root workflow, tagging it with this instance's executor_id
70    /// so heartbeat-based recovery works if this process crashes.
71    pub async fn start_workflow(
72        &self,
73        name: &str,
74        input: Option<serde_json::Value>,
75    ) -> Result<Ctx, DurableError> {
76        Ctx::start_with_executor(&self.db, name, input, Some(self.executor_id.clone())).await
77    }
78
79    /// Start a new root workflow with a timeout, tagging it with this
80    /// instance's executor_id.
81    pub async fn start_workflow_with_timeout(
82        &self,
83        name: &str,
84        input: Option<serde_json::Value>,
85        timeout_ms: i64,
86    ) -> Result<Ctx, DurableError> {
87        Ctx::start_with_timeout_and_executor(
88            &self.db,
89            name,
90            input,
91            timeout_ms,
92            Some(self.executor_id.clone()),
93        )
94        .await
95    }
96}
97
98/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
99/// and recover stale tasks from prior crashes.
100///
101/// Returns a [`DurableInstance`] which derefs to `DatabaseConnection` for
102/// backwards compatibility.
103///
104/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
105/// For custom intervals use [`init_with_config`].
106///
107/// ```ignore
108/// let durable = durable::init("postgres://localhost/mydb").await?;
109/// ```
110pub async fn init(database_url: &str) -> Result<DurableInstance, DurableError> {
111    init_with_config(database_url, HeartbeatConfig::default()).await
112}
113
114/// Like [`init`] but with a custom [`HeartbeatConfig`].
115pub async fn init_with_config(
116    database_url: &str,
117    config: HeartbeatConfig,
118) -> Result<DurableInstance, DurableError> {
119    let mut opt = ConnectOptions::new(database_url);
120    // Apply search_path at the pool level so every connection resolves
121    // the `task_status` enum in the `durable` schema.
122    opt.set_schema_search_path("public,durable");
123    let db = Database::connect(opt).await?;
124
125    // Run migrations
126    durable_db::Migrator::up(&db, None).await?;
127
128    // Create executor with a unique ID for this process
129    let executor_id = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
130    let executor = Executor::new(db.clone(), executor_id.clone());
131
132    // Write initial heartbeat so other workers know we're alive
133    executor.heartbeat().await?;
134
135    let mut all_recovered = Vec::new();
136
137    // Recover stale tasks: timeout/deadline-based
138    let recovered = executor.recover().await?;
139    if !recovered.is_empty() {
140        tracing::info!(
141            "recovered {} stale tasks (timeout/deadline)",
142            recovered.len()
143        );
144    }
145    all_recovered.extend(recovered);
146
147    // Recover stale tasks: heartbeat-based (dead workers, unknown executors, orphaned NULL)
148    let recovered = executor
149        .recover_stale_tasks(config.staleness_threshold)
150        .await?;
151    if !recovered.is_empty() {
152        tracing::info!(
153            "recovered {} stale tasks from dead/unknown workers",
154            recovered.len()
155        );
156    }
157    all_recovered.extend(recovered);
158
159    // Start background heartbeat + recovery loops
160    executor.start_heartbeat(&config);
161    executor.start_recovery_loop(&config);
162
163    tracing::info!("durable initialized (executor={})", executor.executor_id());
164    Ok(DurableInstance {
165        db,
166        executor_id,
167        recovered: all_recovered,
168    })
169}
170
171/// Initialize durable: connect to Postgres and run migrations only.
172///
173/// Does **not** start heartbeat or recovery loops. Use this for tests,
174/// migrations-only scripts, or when you manage the [`Executor`] yourself.
175///
176/// ```ignore
177/// let db = durable::init_db("postgres://localhost/mydb").await?;
178/// ```
179pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
180    let mut opt = ConnectOptions::new(database_url);
181    opt.set_schema_search_path("public,durable");
182    let db = Database::connect(opt).await?;
183    durable_db::Migrator::up(&db, None).await?;
184    tracing::info!("durable initialized (db only)");
185    Ok(db)
186}