1pub mod ctx;
2pub mod error;
3pub mod executor;
4
5pub use ctx::Ctx;
6pub use ctx::RetryPolicy;
7pub use ctx::{TaskQuery, TaskSort, TaskSummary};
8pub use durable_db::entity::sea_orm_active_enums::TaskStatus;
9pub use durable_macros::{step, workflow};
10pub use error::DurableError;
11pub use executor::{Executor, HeartbeatConfig, RecoveredTask};
12pub use sea_orm::DatabaseTransaction;
13
14use sea_orm::{ConnectOptions, Database, DatabaseConnection};
15use sea_orm_migration::MigratorTrait;
16
17pub struct DurableInstance {
38 db: DatabaseConnection,
39 executor_id: String,
40 recovered: Vec<RecoveredTask>,
41}
42
43impl std::ops::Deref for DurableInstance {
44 type Target = DatabaseConnection;
45 fn deref(&self) -> &DatabaseConnection {
46 &self.db
47 }
48}
49
50impl DurableInstance {
51 pub fn db(&self) -> &DatabaseConnection {
53 &self.db
54 }
55
56 pub fn executor_id(&self) -> &str {
58 &self.executor_id
59 }
60
61 pub fn recovered(&self) -> &[RecoveredTask] {
66 &self.recovered
67 }
68
69 pub async fn start_workflow(
72 &self,
73 name: &str,
74 input: Option<serde_json::Value>,
75 ) -> Result<Ctx, DurableError> {
76 Ctx::start_with_executor(&self.db, name, input, Some(self.executor_id.clone())).await
77 }
78
79 pub async fn start_workflow_with_timeout(
82 &self,
83 name: &str,
84 input: Option<serde_json::Value>,
85 timeout_ms: i64,
86 ) -> Result<Ctx, DurableError> {
87 Ctx::start_with_timeout_and_executor(
88 &self.db,
89 name,
90 input,
91 timeout_ms,
92 Some(self.executor_id.clone()),
93 )
94 .await
95 }
96}
97
98pub async fn init(database_url: &str) -> Result<DurableInstance, DurableError> {
111 init_with_config(database_url, HeartbeatConfig::default()).await
112}
113
114pub async fn init_with_config(
116 database_url: &str,
117 config: HeartbeatConfig,
118) -> Result<DurableInstance, DurableError> {
119 let mut opt = ConnectOptions::new(database_url);
120 opt.set_schema_search_path("public,durable");
123 let db = Database::connect(opt).await?;
124
125 durable_db::Migrator::up(&db, None).await?;
127
128 let executor_id = format!("exec-{}-{}", std::process::id(), uuid::Uuid::new_v4());
130 let executor = Executor::new(db.clone(), executor_id.clone());
131
132 executor.heartbeat().await?;
134
135 let mut all_recovered = Vec::new();
136
137 let recovered = executor.recover().await?;
139 if !recovered.is_empty() {
140 tracing::info!(
141 "recovered {} stale tasks (timeout/deadline)",
142 recovered.len()
143 );
144 }
145 all_recovered.extend(recovered);
146
147 let recovered = executor
149 .recover_stale_tasks(config.staleness_threshold)
150 .await?;
151 if !recovered.is_empty() {
152 tracing::info!(
153 "recovered {} stale tasks from dead/unknown workers",
154 recovered.len()
155 );
156 }
157 all_recovered.extend(recovered);
158
159 executor.start_heartbeat(&config);
161 executor.start_recovery_loop(&config);
162
163 tracing::info!("durable initialized (executor={})", executor.executor_id());
164 Ok(DurableInstance {
165 db,
166 executor_id,
167 recovered: all_recovered,
168 })
169}
170
171pub async fn init_db(database_url: &str) -> Result<DatabaseConnection, DurableError> {
180 let mut opt = ConnectOptions::new(database_url);
181 opt.set_schema_search_path("public,durable");
182 let db = Database::connect(opt).await?;
183 durable_db::Migrator::up(&db, None).await?;
184 tracing::info!("durable initialized (db only)");
185 Ok(db)
186}