1pub mod ctx;
2pub mod error;
3pub mod executor;
4pub mod scheduler;
5
6pub use ctx::Ctx;
7pub use ctx::RetryPolicy;
8pub use ctx::StartResult;
9pub use ctx::{TaskQuery, TaskSort, TaskSummary};
10pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
11pub use durable_macros::{step, workflow};
12pub use error::DurableError;
13pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
14pub use scheduler::{SchedulerConfig, next_run};
15pub use sea_orm::DatabaseTransaction;
16
17pub use inventory;
19
20use sea_orm::{ConnectOptions, Database, DatabaseConnection};
21use sea_orm_migration::MigratorTrait;
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::RwLock;
25use uuid::Uuid;
26
27static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
30
31pub fn executor_id() -> Option<String> {
33 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
34}
35
36pub type ResumeFn = fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>;
40
41pub struct WorkflowRegistration {
46 pub name: &'static str,
49 pub resume_fn: ResumeFn,
52}
53
54inventory::collect!(WorkflowRegistration);
55
56pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
58 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
59}
60
61pub async fn resume_workflow(
77 db: &DatabaseConnection,
78 task_id: uuid::Uuid,
79) -> Result<(), DurableError> {
80 let handler = Ctx::resume_failed(db, task_id).await?;
81
82 let lookup_key = handler.as_deref().unwrap_or("");
84 if let Some(reg) = find_workflow(lookup_key) {
85 let db_inner = db.clone();
86 let resume = reg.resume_fn;
87 tokio::spawn(async move {
88 tracing::info!(
89 id = %task_id,
90 "re-dispatching resumed workflow"
91 );
92 run_workflow_with_recovery(db_inner, task_id, resume).await;
93 });
94 } else {
95 tracing::warn!(
96 id = %task_id,
97 handler = ?handler,
98 "no registered handler for resumed workflow — use Ctx::from_id() to resume manually"
99 );
100 }
101
102 Ok(())
103}
104
105pub async fn init(
120 database_url: &str,
121) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
122 init_with_config(database_url, HeartbeatConfig::default()).await
123}
124
125pub async fn init_with_config(
127 database_url: &str,
128 config: HeartbeatConfig,
129) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
130 let mut opt = ConnectOptions::new(database_url);
131 opt.set_schema_search_path("public,durable");
132 let db = Database::connect(opt).await?;
133
134 durable_db::Migrator::up(&db, None).await?;
136
137 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
139 if let Ok(mut guard) = EXECUTOR_ID.write() {
140 *guard = Some(eid.clone());
141 }
142 let executor = Executor::new(db.clone(), eid);
143
144 executor.heartbeat().await?;
146
147 let mut all_recovered = Vec::new();
148
149 let recovered = executor.recover().await?;
151 if !recovered.is_empty() {
152 tracing::info!(
153 "recovered {} stale tasks (timeout/deadline)",
154 recovered.len()
155 );
156 }
157 all_recovered.extend(recovered);
158
159 let recovered = executor
161 .recover_stale_tasks(config.staleness_threshold)
162 .await?;
163 if !recovered.is_empty() {
164 tracing::info!(
165 "recovered {} stale tasks from dead/unknown workers",
166 recovered.len()
167 );
168 }
169 all_recovered.extend(recovered);
170
171 if !all_recovered.is_empty() {
173 let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
174 match executor.reset_orphaned_steps(&recovered_ids).await {
175 Ok(n) if n > 0 => {
176 tracing::info!("reset {n} orphaned steps to PENDING");
177 }
178 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
179 _ => {}
180 }
181 }
182
183 dispatch_recovered(&db, &all_recovered);
185
186 drop(executor.start_heartbeat(&config));
190
191 start_recovery_dispatch_loop(
193 db.clone(),
194 executor.executor_id().to_string(),
195 config.staleness_threshold,
196 );
197
198 scheduler::start_scheduler_loop(
200 db.clone(),
201 executor.executor_id().to_string(),
202 &SchedulerConfig::default(),
203 );
204
205 tracing::info!("durable initialized (executor={})", executor.executor_id());
206 Ok((db, all_recovered))
207}
208
209const RECOVERY_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
211
212async fn run_workflow_with_recovery(db: DatabaseConnection, task_id: Uuid, resume_fn: ResumeFn) {
221 loop {
222 match Ctx::from_id(&db, task_id).await {
223 Ok(ctx) => {
224 let result = tokio::task::spawn(async move { (resume_fn)(ctx).await }).await;
226
227 let err_msg = match result {
228 Ok(Ok(())) => return, Ok(Err(e)) => e.to_string(),
230 Err(join_err) => {
231 if join_err.is_panic() {
233 let panic_msg = match join_err.into_panic().downcast::<String>() {
234 Ok(msg) => *msg,
235 Err(payload) => match payload.downcast::<&str>() {
236 Ok(msg) => msg.to_string(),
237 Err(_) => "unknown panic".to_string(),
238 },
239 };
240 format!("panic: {panic_msg}")
241 } else {
242 "task cancelled".to_string()
243 }
244 }
245 };
246
247 if let Err(fail_err) = Ctx::fail_by_id(&db, task_id, &err_msg).await {
249 tracing::error!(
250 id = %task_id,
251 error = %fail_err,
252 "failed to mark workflow as FAILED"
253 );
254 return;
255 }
256
257 tracing::warn!(
258 id = %task_id,
259 error = %err_msg,
260 "workflow failed, attempting automatic recovery"
261 );
262
263 match Ctx::resume_failed(&db, task_id).await {
265 Ok(_) => {
266 tracing::info!(
267 id = %task_id,
268 "workflow auto-recovery: reset succeeded, re-executing"
269 );
270 tokio::time::sleep(RECOVERY_BACKOFF).await;
271 continue; }
273 Err(DurableError::MaxRecoveryExceeded(_)) => {
274 tracing::error!(
275 id = %task_id,
276 "workflow exceeded max recovery attempts, staying FAILED"
277 );
278 return;
279 }
280 Err(recover_err) => {
281 tracing::error!(
282 id = %task_id,
283 error = %recover_err,
284 "workflow auto-recovery failed"
285 );
286 return;
287 }
288 }
289 }
290 Err(e) => {
291 tracing::error!(
292 id = %task_id,
293 error = %e,
294 "failed to attach to workflow"
295 );
296 return;
297 }
298 }
299 }
300}
301
302fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
305 for task in recovered {
306 if task.parent_id.is_some() {
308 continue;
309 }
310
311 let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
312 if let Some(reg) = find_workflow(lookup_key) {
313 let db_inner = db.clone();
314 let task_id = task.id;
315 let task_name = task.name.clone();
316 let resume = reg.resume_fn;
317 tokio::spawn(async move {
318 tracing::info!(
319 workflow = %task_name,
320 id = %task_id,
321 "auto-resuming recovered workflow"
322 );
323 run_workflow_with_recovery(db_inner, task_id, resume).await;
324 });
325 } else {
326 tracing::warn!(
327 workflow = %task.name,
328 handler = ?task.handler,
329 id = %task.id,
330 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
331 );
332 }
333 }
334}
335
336fn start_recovery_dispatch_loop(
338 db: DatabaseConnection,
339 executor_id: String,
340 staleness_threshold: std::time::Duration,
341) {
342 tokio::spawn(async move {
343 let executor = Executor::new(db.clone(), executor_id);
344 let mut ticker = tokio::time::interval(staleness_threshold);
345 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
346 loop {
347 ticker.tick().await;
348
349 let mut all_recovered_ids = Vec::new();
351
352 match executor.recover().await {
354 Ok(ref recovered) if !recovered.is_empty() => {
355 tracing::info!(
356 "recovered {} stale tasks (timeout/deadline)",
357 recovered.len()
358 );
359 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
360 dispatch_recovered(&db, recovered);
361 }
362 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
363 _ => {}
364 }
365
366 match executor.recover_stale_tasks(staleness_threshold).await {
368 Ok(ref recovered) if !recovered.is_empty() => {
369 tracing::info!(
370 "recovered {} stale tasks from dead workers",
371 recovered.len()
372 );
373 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
374 dispatch_recovered(&db, recovered);
375 }
376 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
377 _ => {}
378 }
379
380 if !all_recovered_ids.is_empty() {
385 match executor.reset_orphaned_steps(&all_recovered_ids).await {
386 Ok(n) if n > 0 => {
387 tracing::info!("reset {n} orphaned steps to PENDING");
388 }
389 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
390 _ => {}
391 }
392 }
393 }
394 });
395}
396
397pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
407 let mut opt = ConnectOptions::new(database_url);
408 opt.set_schema_search_path("public,durable");
409 let db = Database::connect(opt).await?;
410 durable_db::Migrator::up(&db, None).await?;
411 tracing::info!("durable initialized (db only)");
412 Ok(db)
413}