1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14pub use inventory;
16
17use sea_orm::{ConnectOptions, ConnectionTrait, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22
23static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
26
27pub fn executor_id() -> Option<String> {
29 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
30}
31
32pub struct WorkflowRegistration {
39 pub name: &'static str,
41 pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
44}
45
46inventory::collect!(WorkflowRegistration);
47
48pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
50 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
51}
52
53pub async fn init(
70 database_url: &str,
71) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
72 init_with_config(database_url, HeartbeatConfig::default()).await
73}
74
75pub async fn init_with_config(
77 database_url: &str,
78 config: HeartbeatConfig,
79) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
80 let mut opt = ConnectOptions::new(database_url);
81 opt.set_schema_search_path("public,durable");
82 let db = Database::connect(opt).await?;
83
84 durable_db::Migrator::up(&db, None).await?;
86
87 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
89 if let Ok(mut guard) = EXECUTOR_ID.write() {
90 *guard = Some(eid.clone());
91 }
92 let executor = Executor::new(db.clone(), eid);
93
94 executor.heartbeat().await?;
96
97 let mut all_recovered = Vec::new();
98
99 let recovered = executor.recover().await?;
101 if !recovered.is_empty() {
102 tracing::info!(
103 "recovered {} stale tasks (timeout/deadline)",
104 recovered.len()
105 );
106 }
107 all_recovered.extend(recovered);
108
109 let recovered = executor
111 .recover_stale_tasks(config.staleness_threshold)
112 .await?;
113 if !recovered.is_empty() {
114 tracing::info!(
115 "recovered {} stale tasks from dead/unknown workers",
116 recovered.len()
117 );
118 }
119 all_recovered.extend(recovered);
120
121 dispatch_recovered(&db, &all_recovered).await;
123
124 executor.start_heartbeat(&config);
126
127 start_recovery_dispatch_loop(
129 db.clone(),
130 executor.executor_id().to_string(),
131 config.staleness_threshold,
132 );
133
134 tracing::info!("durable initialized (executor={})", executor.executor_id());
135 Ok((db, all_recovered))
136}
137
138async fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
140 for task in recovered {
141 let sql = format!(
143 "UPDATE durable.task SET status = 'RUNNING', started_at = now() \
144 WHERE id = '{}' AND status = 'PENDING'",
145 task.id
146 );
147 if let Err(e) = db
148 .execute(sea_orm::Statement::from_string(
149 sea_orm::DbBackend::Postgres,
150 sql,
151 ))
152 .await
153 {
154 tracing::error!(id = %task.id, error = %e, "failed to set recovered task to RUNNING");
155 continue;
156 }
157
158 if task.parent_id.is_some() {
160 continue;
161 }
162
163 if let Some(reg) = find_workflow(&task.name) {
164 let db_inner = db.clone();
165 let task_id = task.id;
166 let task_name = task.name.clone();
167 let resume = reg.resume_fn;
168 tokio::spawn(async move {
169 tracing::info!(
170 workflow = %task_name,
171 id = %task_id,
172 "auto-resuming recovered workflow"
173 );
174 match Ctx::from_id(&db_inner, task_id).await {
175 Ok(ctx) => {
176 if let Err(e) = (resume)(ctx).await {
177 tracing::error!(
178 workflow = %task_name,
179 id = %task_id,
180 error = %e,
181 "recovered workflow failed"
182 );
183 }
184 }
185 Err(e) => {
186 tracing::error!(
187 workflow = %task_name,
188 id = %task_id,
189 error = %e,
190 "failed to attach to recovered workflow"
191 );
192 }
193 }
194 });
195 } else {
196 tracing::warn!(
197 workflow = %task.name,
198 id = %task.id,
199 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
200 );
201 }
202 }
203}
204
205fn start_recovery_dispatch_loop(
207 db: DatabaseConnection,
208 executor_id: String,
209 staleness_threshold: std::time::Duration,
210) {
211 tokio::spawn(async move {
212 let executor = Executor::new(db.clone(), executor_id);
213 let mut ticker = tokio::time::interval(staleness_threshold);
214 loop {
215 ticker.tick().await;
216
217 match executor.recover().await {
219 Ok(ref recovered) if !recovered.is_empty() => {
220 tracing::info!(
221 "recovered {} stale tasks (timeout/deadline)",
222 recovered.len()
223 );
224 dispatch_recovered(&db, recovered).await;
225 }
226 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
227 _ => {}
228 }
229
230 match executor.recover_stale_tasks(staleness_threshold).await {
232 Ok(ref recovered) if !recovered.is_empty() => {
233 tracing::info!(
234 "recovered {} stale tasks from dead workers",
235 recovered.len()
236 );
237 dispatch_recovered(&db, recovered).await;
238 }
239 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
240 _ => {}
241 }
242 }
243 });
244}
245
246pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
256 let mut opt = ConnectOptions::new(database_url);
257 opt.set_schema_search_path("public,durable");
258 let db = Database::connect(opt).await?;
259 durable_db::Migrator::up(&db, None).await?;
260 tracing::info!("durable initialized (db only)");
261 Ok(db)
262}