1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16
17pub async fn init(database_url: &str) -> Result<DatabaseConnection, DurableError> {
27 init_with_config(database_url, HeartbeatConfig::default()).await
28}
29
30pub async fn init_with_config(
32 database_url: &str,
33 config: HeartbeatConfig,
34) -> Result<DatabaseConnection, DurableError> {
35 let mut opt = ConnectOptions::new(database_url);
36 opt.set_schema_search_path("public,durable");
39 let db = Database::connect(opt).await?;
40
41 durable_db::Migrator::up(&db, None).await?;
43
44 let executor_id = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
46 let executor = Executor::new(db.clone(), executor_id);
47
48 executor.heartbeat().await?;
50
51 let recovered = executor.recover().await?;
53 if !recovered.is_empty() {
54 tracing::info!(
55 "recovered {} stale tasks (timeout/deadline)",
56 recovered.len()
57 );
58 }
59
60 let count = executor
62 .recover_stale_tasks(config.staleness_threshold)
63 .await?;
64 if count > 0 {
65 tracing::info!("recovered {count} stale tasks from dead workers");
66 }
67
68 executor.start_heartbeat(&config);
70 executor.start_recovery_loop(&config);
71
72 tracing::info!("durable initialized (executor={})", executor.executor_id());
73 Ok(db)
74}
75
76pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
85 let mut opt = ConnectOptions::new(database_url);
86 opt.set_schema_search_path("public,durable");
87 let db = Database::connect(opt).await?;
88 durable_db::Migrator::up(&db, None).await?;
89 tracing::info!("durable initialized (db only)");
90 Ok(db)
91}