1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22use uuid::Uuid;
23
24static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
27
28pub fn executor_id() -> Option<String> {
30 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
31}
32
33pub struct WorkflowRegistration {
40 pub name: &'static str,
43 pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
46}
47
48inventory::collect!(WorkflowRegistration);
49
50pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
52 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
53}
54
55pub async fn resume_workflow(
71 db: &DatabaseConnection,
72 task_id: uuid::Uuid,
73) -> Result<(), DurableError> {
74 let handler = Ctx::resume_failed(db, task_id).await?;
75
76 let lookup_key = handler.as_deref().unwrap_or("");
78 if let Some(reg) = find_workflow(lookup_key) {
79 let db_inner = db.clone();
80 let resume = reg.resume_fn;
81 tokio::spawn(async move {
82 tracing::info!(
83 id = %task_id,
84 "re-dispatching resumed workflow"
85 );
86 run_workflow_with_recovery(db_inner, task_id, resume).await;
87 });
88 } else {
89 tracing::warn!(
90 id = %task_id,
91 handler = ?handler,
92 "no registered handler for resumed workflow — use Ctx::from_id() to resume manually"
93 );
94 }
95
96 Ok(())
97}
98
99pub async fn init(
114 database_url: &str,
115) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
116 init_with_config(database_url, HeartbeatConfig::default()).await
117}
118
119pub async fn init_with_config(
121 database_url: &str,
122 config: HeartbeatConfig,
123) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
124 let mut opt = ConnectOptions::new(database_url);
125 opt.set_schema_search_path("public,durable");
126 let db = Database::connect(opt).await?;
127
128 durable_db::Migrator::up(&db, None).await?;
130
131 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
133 if let Ok(mut guard) = EXECUTOR_ID.write() {
134 *guard = Some(eid.clone());
135 }
136 let executor = Executor::new(db.clone(), eid);
137
138 executor.heartbeat().await?;
140
141 let mut all_recovered = Vec::new();
142
143 let recovered = executor.recover().await?;
145 if !recovered.is_empty() {
146 tracing::info!(
147 "recovered {} stale tasks (timeout/deadline)",
148 recovered.len()
149 );
150 }
151 all_recovered.extend(recovered);
152
153 let recovered = executor
155 .recover_stale_tasks(config.staleness_threshold)
156 .await?;
157 if !recovered.is_empty() {
158 tracing::info!(
159 "recovered {} stale tasks from dead/unknown workers",
160 recovered.len()
161 );
162 }
163 all_recovered.extend(recovered);
164
165 if !all_recovered.is_empty() {
167 let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
168 match executor.reset_orphaned_steps(&recovered_ids).await {
169 Ok(n) if n > 0 => {
170 tracing::info!("reset {n} orphaned steps to PENDING");
171 }
172 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
173 _ => {}
174 }
175 }
176
177 dispatch_recovered(&db, &all_recovered);
179
180 executor.start_heartbeat(&config);
182
183 start_recovery_dispatch_loop(
185 db.clone(),
186 executor.executor_id().to_string(),
187 config.staleness_threshold,
188 );
189
190 tracing::info!("durable initialized (executor={})", executor.executor_id());
191 Ok((db, all_recovered))
192}
193
194const RECOVERY_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
196
197async fn run_workflow_with_recovery(
206 db: DatabaseConnection,
207 task_id: Uuid,
208 resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
209) {
210 loop {
211 match Ctx::from_id(&db, task_id).await {
212 Ok(ctx) => {
213 match (resume_fn)(ctx).await {
214 Ok(()) => return, Err(e) => {
216 if let Err(fail_err) = Ctx::fail_by_id(&db, task_id, &e.to_string()).await
218 {
219 tracing::error!(
220 id = %task_id,
221 error = %fail_err,
222 "failed to mark workflow as FAILED"
223 );
224 return;
225 }
226
227 tracing::warn!(
228 id = %task_id,
229 error = %e,
230 "workflow failed, attempting automatic recovery"
231 );
232
233 match Ctx::resume_failed(&db, task_id).await {
235 Ok(_) => {
236 tracing::info!(
237 id = %task_id,
238 "workflow auto-recovery: reset succeeded, re-executing"
239 );
240 tokio::time::sleep(RECOVERY_BACKOFF).await;
241 continue; }
243 Err(DurableError::MaxRecoveryExceeded(_)) => {
244 tracing::error!(
245 id = %task_id,
246 "workflow exceeded max recovery attempts, staying FAILED"
247 );
248 return;
249 }
250 Err(recover_err) => {
251 tracing::error!(
252 id = %task_id,
253 error = %recover_err,
254 "workflow auto-recovery failed"
255 );
256 return;
257 }
258 }
259 }
260 }
261 }
262 Err(e) => {
263 tracing::error!(
264 id = %task_id,
265 error = %e,
266 "failed to attach to workflow"
267 );
268 return;
269 }
270 }
271 }
272}
273
274fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
277 for task in recovered {
278 if task.parent_id.is_some() {
280 continue;
281 }
282
283 let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
284 if let Some(reg) = find_workflow(lookup_key) {
285 let db_inner = db.clone();
286 let task_id = task.id;
287 let task_name = task.name.clone();
288 let resume = reg.resume_fn;
289 tokio::spawn(async move {
290 tracing::info!(
291 workflow = %task_name,
292 id = %task_id,
293 "auto-resuming recovered workflow"
294 );
295 run_workflow_with_recovery(db_inner, task_id, resume).await;
296 });
297 } else {
298 tracing::warn!(
299 workflow = %task.name,
300 handler = ?task.handler,
301 id = %task.id,
302 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
303 );
304 }
305 }
306}
307
308fn start_recovery_dispatch_loop(
310 db: DatabaseConnection,
311 executor_id: String,
312 staleness_threshold: std::time::Duration,
313) {
314 tokio::spawn(async move {
315 let executor = Executor::new(db.clone(), executor_id);
316 let mut ticker = tokio::time::interval(staleness_threshold);
317 loop {
318 ticker.tick().await;
319
320 let mut all_recovered_ids = Vec::new();
322
323 match executor.recover().await {
325 Ok(ref recovered) if !recovered.is_empty() => {
326 tracing::info!(
327 "recovered {} stale tasks (timeout/deadline)",
328 recovered.len()
329 );
330 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
331 dispatch_recovered(&db, recovered);
332 }
333 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
334 _ => {}
335 }
336
337 match executor.recover_stale_tasks(staleness_threshold).await {
339 Ok(ref recovered) if !recovered.is_empty() => {
340 tracing::info!(
341 "recovered {} stale tasks from dead workers",
342 recovered.len()
343 );
344 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
345 dispatch_recovered(&db, recovered);
346 }
347 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
348 _ => {}
349 }
350
351 if !all_recovered_ids.is_empty() {
356 match executor.reset_orphaned_steps(&all_recovered_ids).await {
357 Ok(n) if n > 0 => {
358 tracing::info!("reset {n} orphaned steps to PENDING");
359 }
360 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
361 _ => {}
362 }
363 }
364 }
365 });
366}
367
368pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
378 let mut opt = ConnectOptions::new(database_url);
379 opt.set_schema_search_path("public,durable");
380 let db = Database::connect(opt).await?;
381 durable_db::Migrator::up(&db, None).await?;
382 tracing::info!("durable initialized (db only)");
383 Ok(db)
384}