1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, ConnectionTrait, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16use std::sync::RwLock;
17
18static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
21
22pub fn executor_id() -> Option<String> {
25 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
26}
27
28pub async fn init(
51 database_url: &str,
52) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
53 init_with_config(database_url, HeartbeatConfig::default()).await
54}
55
56pub async fn init_with_config(
58 database_url: &str,
59 config: HeartbeatConfig,
60) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
61 let mut opt = ConnectOptions::new(database_url);
62 opt.set_schema_search_path("public,durable");
63 let db = Database::connect(opt).await?;
64
65 durable_db::Migrator::up(&db, None).await?;
67
68 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
70 if let Ok(mut guard) = EXECUTOR_ID.write() {
72 *guard = Some(eid.clone());
73 }
74 let executor = Executor::new(db.clone(), eid);
75
76 executor.heartbeat().await?;
78
79 let mut all_recovered = Vec::new();
80
81 let recovered = executor.recover().await?;
83 if !recovered.is_empty() {
84 tracing::info!(
85 "recovered {} stale tasks (timeout/deadline)",
86 recovered.len()
87 );
88 }
89 all_recovered.extend(recovered);
90
91 let recovered = executor
93 .recover_stale_tasks(config.staleness_threshold)
94 .await?;
95 if !recovered.is_empty() {
96 tracing::info!(
97 "recovered {} stale tasks from dead/unknown workers",
98 recovered.len()
99 );
100 }
101 all_recovered.extend(recovered);
102
103 for task in &all_recovered {
105 let sql = format!(
106 "UPDATE durable.task SET status = 'RUNNING', started_at = now() \
107 WHERE id = '{}' AND status = 'PENDING'",
108 task.id
109 );
110 db.execute(sea_orm::Statement::from_string(
111 sea_orm::DbBackend::Postgres,
112 sql,
113 ))
114 .await?;
115 }
116
117 executor.start_heartbeat(&config);
119 executor.start_recovery_loop(&config);
120
121 tracing::info!("durable initialized (executor={})", executor.executor_id());
122 Ok((db, all_recovered))
123}
124
125pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
135 let mut opt = ConnectOptions::new(database_url);
136 opt.set_schema_search_path("public,durable");
137 let db = Database::connect(opt).await?;
138 durable_db::Migrator::up(&db, None).await?;
139 tracing::info!("durable initialized (db only)");
140 Ok(db)
141}