1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22use uuid::Uuid;
23
24static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
27
28pub fn executor_id() -> Option<String> {
30 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
31}
32
33pub struct WorkflowRegistration {
40 pub name: &'static str,
43 pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
46}
47
48inventory::collect!(WorkflowRegistration);
49
50pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
52 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
53}
54
55pub async fn resume_workflow(
71 db: &DatabaseConnection,
72 task_id: uuid::Uuid,
73) -> Result<(), DurableError> {
74 let handler = Ctx::resume_failed(db, task_id).await?;
75
76 let lookup_key = handler.as_deref().unwrap_or("");
78 if let Some(reg) = find_workflow(lookup_key) {
79 let db_inner = db.clone();
80 let resume = reg.resume_fn;
81 tokio::spawn(async move {
82 tracing::info!(
83 id = %task_id,
84 "re-dispatching resumed workflow"
85 );
86 run_workflow_with_recovery(db_inner, task_id, resume).await;
87 });
88 } else {
89 tracing::warn!(
90 id = %task_id,
91 handler = ?handler,
92 "no registered handler for resumed workflow — use Ctx::from_id() to resume manually"
93 );
94 }
95
96 Ok(())
97}
98
99pub async fn init(
114 database_url: &str,
115) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
116 init_with_config(database_url, HeartbeatConfig::default()).await
117}
118
119pub async fn init_with_config(
121 database_url: &str,
122 config: HeartbeatConfig,
123) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
124 let mut opt = ConnectOptions::new(database_url);
125 opt.set_schema_search_path("public,durable");
126 let db = Database::connect(opt).await?;
127
128 durable_db::Migrator::up(&db, None).await?;
130
131 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
133 if let Ok(mut guard) = EXECUTOR_ID.write() {
134 *guard = Some(eid.clone());
135 }
136 let executor = Executor::new(db.clone(), eid);
137
138 executor.heartbeat().await?;
140
141 let mut all_recovered = Vec::new();
142
143 let recovered = executor.recover().await?;
145 if !recovered.is_empty() {
146 tracing::info!(
147 "recovered {} stale tasks (timeout/deadline)",
148 recovered.len()
149 );
150 }
151 all_recovered.extend(recovered);
152
153 let recovered = executor
155 .recover_stale_tasks(config.staleness_threshold)
156 .await?;
157 if !recovered.is_empty() {
158 tracing::info!(
159 "recovered {} stale tasks from dead/unknown workers",
160 recovered.len()
161 );
162 }
163 all_recovered.extend(recovered);
164
165 if !all_recovered.is_empty() {
167 let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
168 match executor.reset_orphaned_steps(&recovered_ids).await {
169 Ok(n) if n > 0 => {
170 tracing::info!("reset {n} orphaned steps to PENDING");
171 }
172 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
173 _ => {}
174 }
175 }
176
177 dispatch_recovered(&db, &all_recovered);
179
180 executor.start_heartbeat(&config);
182
183 start_recovery_dispatch_loop(
185 db.clone(),
186 executor.executor_id().to_string(),
187 config.staleness_threshold,
188 );
189
190 tracing::info!("durable initialized (executor={})", executor.executor_id());
191 Ok((db, all_recovered))
192}
193
194const RECOVERY_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
196
197async fn run_workflow_with_recovery(
206 db: DatabaseConnection,
207 task_id: Uuid,
208 resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
209) {
210 loop {
211 match Ctx::from_id(&db, task_id).await {
212 Ok(ctx) => {
213 let result = tokio::task::spawn(async move { (resume_fn)(ctx).await }).await;
215
216 let err_msg = match result {
217 Ok(Ok(())) => return, Ok(Err(e)) => e.to_string(),
219 Err(join_err) => {
220 if join_err.is_panic() {
222 let panic_msg = match join_err.into_panic().downcast::<String>() {
223 Ok(msg) => *msg,
224 Err(payload) => match payload.downcast::<&str>() {
225 Ok(msg) => msg.to_string(),
226 Err(_) => "unknown panic".to_string(),
227 },
228 };
229 format!("panic: {panic_msg}")
230 } else {
231 "task cancelled".to_string()
232 }
233 }
234 };
235
236 if let Err(fail_err) =
238 Ctx::fail_by_id(&db, task_id, &err_msg).await
239 {
240 tracing::error!(
241 id = %task_id,
242 error = %fail_err,
243 "failed to mark workflow as FAILED"
244 );
245 return;
246 }
247
248 tracing::warn!(
249 id = %task_id,
250 error = %err_msg,
251 "workflow failed, attempting automatic recovery"
252 );
253
254 match Ctx::resume_failed(&db, task_id).await {
256 Ok(_) => {
257 tracing::info!(
258 id = %task_id,
259 "workflow auto-recovery: reset succeeded, re-executing"
260 );
261 tokio::time::sleep(RECOVERY_BACKOFF).await;
262 continue; }
264 Err(DurableError::MaxRecoveryExceeded(_)) => {
265 tracing::error!(
266 id = %task_id,
267 "workflow exceeded max recovery attempts, staying FAILED"
268 );
269 return;
270 }
271 Err(recover_err) => {
272 tracing::error!(
273 id = %task_id,
274 error = %recover_err,
275 "workflow auto-recovery failed"
276 );
277 return;
278 }
279 }
280 }
281 Err(e) => {
282 tracing::error!(
283 id = %task_id,
284 error = %e,
285 "failed to attach to workflow"
286 );
287 return;
288 }
289 }
290 }
291}
292
293fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
296 for task in recovered {
297 if task.parent_id.is_some() {
299 continue;
300 }
301
302 let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
303 if let Some(reg) = find_workflow(lookup_key) {
304 let db_inner = db.clone();
305 let task_id = task.id;
306 let task_name = task.name.clone();
307 let resume = reg.resume_fn;
308 tokio::spawn(async move {
309 tracing::info!(
310 workflow = %task_name,
311 id = %task_id,
312 "auto-resuming recovered workflow"
313 );
314 run_workflow_with_recovery(db_inner, task_id, resume).await;
315 });
316 } else {
317 tracing::warn!(
318 workflow = %task.name,
319 handler = ?task.handler,
320 id = %task.id,
321 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
322 );
323 }
324 }
325}
326
327fn start_recovery_dispatch_loop(
329 db: DatabaseConnection,
330 executor_id: String,
331 staleness_threshold: std::time::Duration,
332) {
333 tokio::spawn(async move {
334 let executor = Executor::new(db.clone(), executor_id);
335 let mut ticker = tokio::time::interval(staleness_threshold);
336 loop {
337 ticker.tick().await;
338
339 let mut all_recovered_ids = Vec::new();
341
342 match executor.recover().await {
344 Ok(ref recovered) if !recovered.is_empty() => {
345 tracing::info!(
346 "recovered {} stale tasks (timeout/deadline)",
347 recovered.len()
348 );
349 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
350 dispatch_recovered(&db, recovered);
351 }
352 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
353 _ => {}
354 }
355
356 match executor.recover_stale_tasks(staleness_threshold).await {
358 Ok(ref recovered) if !recovered.is_empty() => {
359 tracing::info!(
360 "recovered {} stale tasks from dead workers",
361 recovered.len()
362 );
363 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
364 dispatch_recovered(&db, recovered);
365 }
366 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
367 _ => {}
368 }
369
370 if !all_recovered_ids.is_empty() {
375 match executor.reset_orphaned_steps(&all_recovered_ids).await {
376 Ok(n) if n > 0 => {
377 tracing::info!("reset {n} orphaned steps to PENDING");
378 }
379 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
380 _ => {}
381 }
382 }
383 }
384 });
385}
386
387pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
397 let mut opt = ConnectOptions::new(database_url);
398 opt.set_schema_search_path("public,durable");
399 let db = Database::connect(opt).await?;
400 durable_db::Migrator::up(&db, None).await?;
401 tracing::info!("durable initialized (db only)");
402 Ok(db)
403}