1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22use uuid::Uuid;
23
24static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
27
28pub fn executor_id() -> Option<String> {
30 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
31}
32
33pub struct WorkflowRegistration {
40 pub name: &'static str,
43 pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
46}
47
48inventory::collect!(WorkflowRegistration);
49
50pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
52 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
53}
54
55pub async fn init(
72 database_url: &str,
73) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
74 init_with_config(database_url, HeartbeatConfig::default()).await
75}
76
77pub async fn init_with_config(
79 database_url: &str,
80 config: HeartbeatConfig,
81) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
82 let mut opt = ConnectOptions::new(database_url);
83 opt.set_schema_search_path("public,durable");
84 let db = Database::connect(opt).await?;
85
86 durable_db::Migrator::up(&db, None).await?;
88
89 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
91 if let Ok(mut guard) = EXECUTOR_ID.write() {
92 *guard = Some(eid.clone());
93 }
94 let executor = Executor::new(db.clone(), eid);
95
96 executor.heartbeat().await?;
98
99 let mut all_recovered = Vec::new();
100
101 let recovered = executor.recover().await?;
103 if !recovered.is_empty() {
104 tracing::info!(
105 "recovered {} stale tasks (timeout/deadline)",
106 recovered.len()
107 );
108 }
109 all_recovered.extend(recovered);
110
111 let recovered = executor
113 .recover_stale_tasks(config.staleness_threshold)
114 .await?;
115 if !recovered.is_empty() {
116 tracing::info!(
117 "recovered {} stale tasks from dead/unknown workers",
118 recovered.len()
119 );
120 }
121 all_recovered.extend(recovered);
122
123 if !all_recovered.is_empty() {
125 let recovered_ids: Vec<Uuid> = all_recovered.iter().map(|r| r.id).collect();
126 match executor.reset_orphaned_steps(&recovered_ids).await {
127 Ok(n) if n > 0 => {
128 tracing::info!("reset {n} orphaned steps to PENDING");
129 }
130 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
131 _ => {}
132 }
133 }
134
135 dispatch_recovered(&db, &all_recovered);
137
138 executor.start_heartbeat(&config);
140
141 start_recovery_dispatch_loop(
143 db.clone(),
144 executor.executor_id().to_string(),
145 config.staleness_threshold,
146 );
147
148 tracing::info!("durable initialized (executor={})", executor.executor_id());
149 Ok((db, all_recovered))
150}
151
152fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
155 for task in recovered {
156 if task.parent_id.is_some() {
158 continue;
159 }
160
161 let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
162 if let Some(reg) = find_workflow(lookup_key) {
163 let db_inner = db.clone();
164 let task_id = task.id;
165 let task_name = task.name.clone();
166 let resume = reg.resume_fn;
167 tokio::spawn(async move {
168 tracing::info!(
169 workflow = %task_name,
170 id = %task_id,
171 "auto-resuming recovered workflow"
172 );
173 match Ctx::from_id(&db_inner, task_id).await {
174 Ok(ctx) => {
175 if let Err(e) = (resume)(ctx).await {
176 tracing::error!(
177 workflow = %task_name,
178 id = %task_id,
179 error = %e,
180 "recovered workflow failed"
181 );
182 }
183 }
184 Err(e) => {
185 tracing::error!(
186 workflow = %task_name,
187 id = %task_id,
188 error = %e,
189 "failed to attach to recovered workflow"
190 );
191 }
192 }
193 });
194 } else {
195 tracing::warn!(
196 workflow = %task.name,
197 handler = ?task.handler,
198 id = %task.id,
199 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
200 );
201 }
202 }
203}
204
205fn start_recovery_dispatch_loop(
207 db: DatabaseConnection,
208 executor_id: String,
209 staleness_threshold: std::time::Duration,
210) {
211 tokio::spawn(async move {
212 let executor = Executor::new(db.clone(), executor_id);
213 let mut ticker = tokio::time::interval(staleness_threshold);
214 loop {
215 ticker.tick().await;
216
217 let mut all_recovered_ids = Vec::new();
219
220 match executor.recover().await {
222 Ok(ref recovered) if !recovered.is_empty() => {
223 tracing::info!(
224 "recovered {} stale tasks (timeout/deadline)",
225 recovered.len()
226 );
227 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
228 dispatch_recovered(&db, recovered);
229 }
230 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
231 _ => {}
232 }
233
234 match executor.recover_stale_tasks(staleness_threshold).await {
236 Ok(ref recovered) if !recovered.is_empty() => {
237 tracing::info!(
238 "recovered {} stale tasks from dead workers",
239 recovered.len()
240 );
241 all_recovered_ids.extend(recovered.iter().map(|r| r.id));
242 dispatch_recovered(&db, recovered);
243 }
244 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
245 _ => {}
246 }
247
248 if !all_recovered_ids.is_empty() {
253 match executor.reset_orphaned_steps(&all_recovered_ids).await {
254 Ok(n) if n > 0 => {
255 tracing::info!("reset {n} orphaned steps to PENDING");
256 }
257 Err(e) => tracing::warn!("failed to reset orphaned steps: {e}"),
258 _ => {}
259 }
260 }
261 }
262 });
263}
264
265pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
275 let mut opt = ConnectOptions::new(database_url);
276 opt.set_schema_search_path("public,durable");
277 let db = Database::connect(opt).await?;
278 durable_db::Migrator::up(&db, None).await?;
279 tracing::info!("durable initialized (db only)");
280 Ok(db)
281}