Skip to main content

durable/
lib.rs

1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16
17/// Initialize durable: connect to Postgres, run migrations, start heartbeat,
18/// and recover stale tasks from prior crashes.
19///
20/// Uses [`HeartbeatConfig::default()`] (60 s heartbeat, 180 s staleness).
21/// For custom intervals use [`init_with_config`].
22///
23/// ```ignore
24/// let db = durable::init("postgres://localhost/mydb").await?;
25/// ```
26pub async fn init(database_url: &str) -> Result<DatabaseConnection, DurableError> {
27    init_with_config(database_url, HeartbeatConfig::default()).await
28}
29
30/// Like [`init`] but with a custom [`HeartbeatConfig`].
31pub async fn init_with_config(
32    database_url: &str,
33    config: HeartbeatConfig,
34) -> Result<DatabaseConnection, DurableError> {
35    let mut opt = ConnectOptions::new(database_url);
36    // Apply search_path at the pool level so every connection resolves
37    // the `task_status` enum in the `durable` schema.
38    opt.set_schema_search_path("public,durable");
39    let db = Database::connect(opt).await?;
40
41    // Run migrations
42    durable_db::Migrator::up(&db, None).await?;
43
44    // Create executor with a unique ID for this process
45    let executor_id = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
46    let executor = Executor::new(db.clone(), executor_id);
47
48    // Write initial heartbeat so other workers know we're alive
49    executor.heartbeat().await?;
50
51    // Recover stale tasks: timeout/deadline-based
52    let recovered = executor.recover().await?;
53    if !recovered.is_empty() {
54        tracing::info!(
55            "recovered {} stale tasks (timeout/deadline)",
56            recovered.len()
57        );
58    }
59
60    // Recover stale tasks: heartbeat-based (from dead workers)
61    let count = executor
62        .recover_stale_tasks(config.staleness_threshold)
63        .await?;
64    if count > 0 {
65        tracing::info!("recovered {count} stale tasks from dead workers");
66    }
67
68    // Start background heartbeat + recovery loops
69    executor.start_heartbeat(&config);
70    executor.start_recovery_loop(&config);
71
72    tracing::info!("durable initialized (executor={})", executor.executor_id());
73    Ok(db)
74}
75
76/// Initialize durable: connect to Postgres and run migrations only.
77///
78/// Does **not** start heartbeat or recovery loops. Use this for tests,
79/// migrations-only scripts, or when you manage the [`Executor`] yourself.
80///
81/// ```ignore
82/// let db = durable::init_db("postgres://localhost/mydb").await?;
83/// ```
84pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
85    let mut opt = ConnectOptions::new(database_url);
86    opt.set_schema_search_path("public,durable");
87    let db = Database::connect(opt).await?;
88    durable_db::Migrator::up(&db, None).await?;
89    tracing::info!("durable initialized (db only)");
90    Ok(db)
91}