1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14pub use inventory;
16
17use sea_orm::{ConnectOptions, Database, DatabaseConnection};
18use sea_orm_migration::MigratorTrait;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::RwLock;
22
23static EXECUTOR_ID: RwLock<Option<String>> = RwLock::new(None);
26
27pub fn executor_id() -> Option<String> {
29 EXECUTOR_ID.read().ok().and_then(|g| g.clone())
30}
31
32pub struct WorkflowRegistration {
39 pub name: &'static str,
42 pub resume_fn: fn(Ctx) -> Pin<Box<dyn Future<Output = Result<(), DurableError>> + Send>>,
45}
46
47inventory::collect!(WorkflowRegistration);
48
49pub fn find_workflow(name: &str) -> Option<&'static WorkflowRegistration> {
51 inventory::iter::<WorkflowRegistration>().find(|r| r.name == name)
52}
53
54pub async fn init(
71 database_url: &str,
72) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
73 init_with_config(database_url, HeartbeatConfig::default()).await
74}
75
76pub async fn init_with_config(
78 database_url: &str,
79 config: HeartbeatConfig,
80) -> Result<(DatabaseConnection, Vec<RecoveredTask>), DurableError> {
81 let mut opt = ConnectOptions::new(database_url);
82 opt.set_schema_search_path("public,durable");
83 let db = Database::connect(opt).await?;
84
85 durable_db::Migrator::up(&db, None).await?;
87
88 let eid = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
90 if let Ok(mut guard) = EXECUTOR_ID.write() {
91 *guard = Some(eid.clone());
92 }
93 let executor = Executor::new(db.clone(), eid);
94
95 executor.heartbeat().await?;
97
98 let mut all_recovered = Vec::new();
99
100 let recovered = executor.recover().await?;
102 if !recovered.is_empty() {
103 tracing::info!(
104 "recovered {} stale tasks (timeout/deadline)",
105 recovered.len()
106 );
107 }
108 all_recovered.extend(recovered);
109
110 let recovered = executor
112 .recover_stale_tasks(config.staleness_threshold)
113 .await?;
114 if !recovered.is_empty() {
115 tracing::info!(
116 "recovered {} stale tasks from dead/unknown workers",
117 recovered.len()
118 );
119 }
120 all_recovered.extend(recovered);
121
122 dispatch_recovered(&db, &all_recovered);
124
125 executor.start_heartbeat(&config);
127
128 start_recovery_dispatch_loop(
130 db.clone(),
131 executor.executor_id().to_string(),
132 config.staleness_threshold,
133 );
134
135 tracing::info!("durable initialized (executor={})", executor.executor_id());
136 Ok((db, all_recovered))
137}
138
139fn dispatch_recovered(db: &DatabaseConnection, recovered: &[RecoveredTask]) {
142 for task in recovered {
143 if task.parent_id.is_some() {
145 continue;
146 }
147
148 let lookup_key = task.handler.as_deref().unwrap_or(&task.name);
149 if let Some(reg) = find_workflow(lookup_key) {
150 let db_inner = db.clone();
151 let task_id = task.id;
152 let task_name = task.name.clone();
153 let resume = reg.resume_fn;
154 tokio::spawn(async move {
155 tracing::info!(
156 workflow = %task_name,
157 id = %task_id,
158 "auto-resuming recovered workflow"
159 );
160 match Ctx::from_id(&db_inner, task_id).await {
161 Ok(ctx) => {
162 if let Err(e) = (resume)(ctx).await {
163 tracing::error!(
164 workflow = %task_name,
165 id = %task_id,
166 error = %e,
167 "recovered workflow failed"
168 );
169 }
170 }
171 Err(e) => {
172 tracing::error!(
173 workflow = %task_name,
174 id = %task_id,
175 error = %e,
176 "failed to attach to recovered workflow"
177 );
178 }
179 }
180 });
181 } else {
182 tracing::warn!(
183 workflow = %task.name,
184 handler = ?task.handler,
185 id = %task.id,
186 "no registered handler for recovered task — use Ctx::from_id() to resume manually"
187 );
188 }
189 }
190}
191
192fn start_recovery_dispatch_loop(
194 db: DatabaseConnection,
195 executor_id: String,
196 staleness_threshold: std::time::Duration,
197) {
198 tokio::spawn(async move {
199 let executor = Executor::new(db.clone(), executor_id);
200 let mut ticker = tokio::time::interval(staleness_threshold);
201 loop {
202 ticker.tick().await;
203
204 match executor.recover().await {
206 Ok(ref recovered) if !recovered.is_empty() => {
207 tracing::info!(
208 "recovered {} stale tasks (timeout/deadline)",
209 recovered.len()
210 );
211 dispatch_recovered(&db, recovered);
212 }
213 Err(e) => tracing::warn!("timeout recovery failed: {e}"),
214 _ => {}
215 }
216
217 match executor.recover_stale_tasks(staleness_threshold).await {
219 Ok(ref recovered) if !recovered.is_empty() => {
220 tracing::info!(
221 "recovered {} stale tasks from dead workers",
222 recovered.len()
223 );
224 dispatch_recovered(&db, recovered);
225 }
226 Err(e) => tracing::warn!("heartbeat recovery failed: {e}"),
227 _ => {}
228 }
229 }
230 });
231}
232
233pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
243 let mut opt = ConnectOptions::new(database_url);
244 opt.set_schema_search_path("public,durable");
245 let db = Database::connect(opt).await?;
246 durable_db::Migrator::up(&db, None).await?;
247 tracing::info!("durable initialized (db only)");
248 Ok(db)
249}