1#![allow(missing_docs)]
3
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use deadpool_postgres::{Pool, PoolError};
7use futures::Stream;
8use ora_backend::{
9 Backend,
10 common::{NextPageToken, TimeRange},
11 executions::{
12 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
13 SucceededExecution,
14 },
15 executors::ExecutorId,
16 jobs::{
17 AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
18 NewJob,
19 },
20 schedules::{
21 AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
22 ScheduleId, ScheduleOrderBy, StoppedSchedule,
23 },
24};
25
26use refinery::embed_migrations;
27use thiserror::Error;
28use uuid::Uuid;
29
30use crate::{
31 db::{DbPool, DbTransaction},
32 query::{
33 jobs::{cancel_jobs, job_count},
34 schedules::schedule_count,
35 },
36};
37
38embed_migrations!("./migrations");
39
40mod db;
41mod models;
42mod query;
43
44#[must_use]
46pub struct PostgresBackend {
47 pool: DbPool,
48 delete_batch_size: usize,
49}
50
51type Result<T> = core::result::Result<T, Error>;
52
53#[derive(Error, Debug)]
55pub enum Error {
56 #[error("{0}")]
58 Migrations(#[from] refinery::Error),
59 #[error("{0}")]
61 Pool(#[from] PoolError),
62 #[error("{0}")]
64 Postgres(#[from] tokio_postgres::Error),
65 #[error("{0}")]
67 Serde(#[from] serde_json::Error),
68 #[error("invalid page token: {0}")]
70 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
71}
72
73impl PostgresBackend {
74 pub async fn new(pool: Pool) -> Result<Self> {
77 let mut conn = pool.get().await?;
78 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
79 migrations::runner()
80 .set_migration_table_name("ora.migrations")
81 .run_async(&mut **conn)
82 .await?;
83 Ok(Self {
84 pool: DbPool(pool),
85 delete_batch_size: 40_000,
86 })
87 }
88
89 pub fn with_delete_batch_size(mut self, batch_size: usize) -> Self {
95 assert!(batch_size > 0, "batch size must be greater than zero");
96 self.delete_batch_size = batch_size;
97 self
98 }
99}
100
101impl Backend for PostgresBackend {
102 type Error = Error;
103
104 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
105 if job_types.is_empty() {
106 return Ok(());
107 }
108
109 let mut conn = self.pool.get().await?;
110
111 let tx = conn.transaction().await?;
112
113 let mut col_id = Vec::with_capacity(job_types.len());
114 let mut col_description = Vec::with_capacity(job_types.len());
115 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
116 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
117
118 for job_type in job_types {
119 col_id.push(job_type.id.as_str());
120 col_description.push(job_type.description.as_deref());
121 col_input_schema_json.push(job_type.input_schema_json.as_deref());
122 col_output_schema_json.push(job_type.output_schema_json.as_deref());
123 }
124
125 {
126 let stmt = tx
127 .prepare(
128 r#"--sql
129 INSERT INTO ora.job_type (
130 id,
131 description,
132 input_schema_json,
133 output_schema_json
134 ) SELECT * FROM UNNEST(
135 $1::TEXT[],
136 $2::TEXT[],
137 $3::TEXT[],
138 $4::TEXT[]
139 ) ON CONFLICT (id) DO UPDATE SET
140 description = EXCLUDED.description,
141 input_schema_json = EXCLUDED.input_schema_json,
142 output_schema_json = EXCLUDED.output_schema_json
143 WHERE
144 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
145 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
146 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
147 "#,
148 )
149 .await?;
150
151 tx.execute(
152 &stmt,
153 &[
154 &col_id,
155 &col_description,
156 &col_input_schema_json,
157 &col_output_schema_json,
158 ],
159 )
160 .await?;
161 }
162
163 tx.commit().await?;
164
165 Ok(())
166 }
167
168 async fn list_job_types(&self) -> Result<Vec<JobType>> {
169 let mut conn = self.pool.get().await?;
170
171 let tx = conn.read_only_transaction().await?;
172
173 let job_types = {
174 let stmt = tx
175 .prepare(
176 r#"--sql
177 SELECT
178 id,
179 description,
180 input_schema_json,
181 output_schema_json
182 FROM
183 ora.job_type
184 ORDER BY id
185 "#,
186 )
187 .await?;
188
189 let rows = tx.query(&stmt, &[]).await?;
190
191 rows.into_iter()
192 .map(|row| {
193 Result::<_>::Ok(JobType {
194 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
195 description: row.try_get(1)?,
196 input_schema_json: row.try_get(2)?,
197 output_schema_json: row.try_get(3)?,
198 })
199 })
200 .collect::<Result<Vec<_>>>()?
201 };
202
203 tx.commit().await?;
204
205 Ok(job_types)
206 }
207
208 async fn add_jobs(
209 &self,
210 jobs: &[NewJob],
211 if_not_exists: Option<JobFilters>,
212 ) -> Result<AddedJobs> {
213 if jobs.is_empty() {
214 return Ok(AddedJobs::Added(Vec::new()));
215 }
216
217 let mut conn = self.pool.get().await?;
218
219 let tx = conn.transaction().await?;
220
221 if let Some(filters) = if_not_exists {
222 let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
223
224 if !existing_job_ids.is_empty() {
225 tx.commit().await?;
226 return Ok(AddedJobs::Existing(existing_job_ids));
227 }
228 }
229
230 let mut col_id = Vec::with_capacity(jobs.len());
231 let mut col_schedule_id = Vec::with_capacity(jobs.len());
232
233 {
234 let mut col_job_type_id = Vec::with_capacity(jobs.len());
235 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
236 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
237 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
238 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
239
240 for job in jobs {
241 col_id.push(Uuid::now_v7());
242 col_job_type_id.push(job.job.job_type_id.as_str());
243 col_target_execution_time.push(job.job.target_execution_time);
244 col_input_payload_json.push(job.job.input_payload_json.as_str());
245 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
246 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
247 col_schedule_id.push(job.schedule_id.map(|s| s.0));
248 }
249
250 let stmt = tx
251 .prepare(
252 r#"--sql
253 INSERT INTO ora.job (
254 id,
255 job_type_id,
256 target_execution_time,
257 input_payload_json,
258 timeout_policy_json,
259 retry_policy_json,
260 schedule_id
261 ) SELECT * FROM UNNEST(
262 $1::UUID[],
263 $2::TEXT[],
264 $3::TIMESTAMPTZ[],
265 $4::TEXT[],
266 $5::TEXT[],
267 $6::TEXT[],
268 $7::UUID[]
269 )
270 "#,
271 )
272 .await?;
273
274 tx.execute(
275 &stmt,
276 &[
277 &col_id,
278 &col_job_type_id,
279 &col_target_execution_time,
280 &col_input_payload_json,
281 &col_timeout_policy_json,
282 &col_retry_policy_json,
283 &col_schedule_id,
284 ],
285 )
286 .await?;
287 }
288
289 {
290 let mut col_job_id = Vec::with_capacity(jobs.len());
291 let mut col_job_label_key = Vec::with_capacity(jobs.len());
292 let mut col_job_label_value = Vec::with_capacity(jobs.len());
293
294 for (i, job) in jobs.iter().enumerate() {
295 for label in &job.job.labels {
296 col_job_id.push(col_id[i]);
297 col_job_label_key.push(label.key.as_str());
298 col_job_label_value.push(label.value.as_str());
299 }
300 }
301
302 let stmt = tx
303 .prepare(
304 r#"--sql
305 INSERT INTO ora.job_label (
306 job_id,
307 label_key,
308 label_value
309 ) SELECT * FROM UNNEST(
310 $1::UUID[],
311 $2::TEXT[],
312 $3::TEXT[]
313 )
314 "#,
315 )
316 .await?;
317
318 tx.execute(
319 &stmt,
320 &[&col_job_id, &col_job_label_key, &col_job_label_value],
321 )
322 .await?;
323 }
324
325 {
326 let stmt = tx
327 .prepare(
328 r#"--sql
329 UPDATE ora.schedule
330 SET
331 active_job_id = t.job_id
332 FROM UNNEST(
333 $1::UUID[],
334 $2::UUID[]
335 ) AS t(job_id, schedule_id)
336 WHERE
337 ora.schedule.id = t.schedule_id
338 "#,
339 )
340 .await?;
341
342 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
343 }
344
345 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
346 add_executions(&tx, &job_ids).await?;
347
348 tx.commit().await?;
349
350 Ok(AddedJobs::Added(job_ids))
351 }
352
353 async fn list_jobs(
354 &self,
355 filters: JobFilters,
356 order_by: Option<JobOrderBy>,
357 page_size: u32,
358 page_token: Option<NextPageToken>,
359 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
360 let mut conn = self.pool.get().await?;
361 let tx = conn.read_only_transaction().await?;
362
363 let (jobs, next_page_token) = query::jobs::job_details(
364 &tx,
365 filters,
366 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
367 page_size,
368 page_token.map(|t| t.0),
369 )
370 .await?;
371
372 tx.commit().await?;
373
374 Ok((jobs, next_page_token))
375 }
376
377 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
378 let mut conn = self.pool.get().await?;
379 let tx = conn.read_only_transaction().await?;
380 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
381 tx.commit().await?;
382 Ok(count)
383 }
384
385 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
386 let mut conn = self.pool.get().await?;
387 let tx = conn.transaction().await?;
388 let now = std::time::SystemTime::now();
389
390 let jobs = cancel_jobs(&tx, filters).await?;
391 mark_jobs_inactive(
392 &tx,
393 &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>(),
394 )
395 .await?;
396 tx.commit().await?;
397
398 Ok(jobs)
399 }
400
401 async fn add_schedules(
402 &self,
403 schedules: &[ScheduleDefinition],
404 if_not_exists: Option<ScheduleFilters>,
405 ) -> Result<AddedSchedules> {
406 if schedules.is_empty() {
407 return Ok(AddedSchedules::Added(Vec::new()));
408 }
409
410 let mut conn = self.pool.get().await?;
411 let tx = conn.transaction().await?;
412
413 if let Some(filters) = if_not_exists {
414 let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
415
416 if !existing_schedule_ids.is_empty() {
417 tx.commit().await?;
418 return Ok(AddedSchedules::Existing(existing_schedule_ids));
419 }
420 }
421
422 let mut col_id = Vec::with_capacity(schedules.len());
423 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
424 let mut col_job_template_json = Vec::with_capacity(schedules.len());
425 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
426 let mut col_start_after = Vec::with_capacity(schedules.len());
427 let mut col_end_before = Vec::with_capacity(schedules.len());
428
429 for schedule in schedules {
430 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
431
432 col_id.push(Uuid::now_v7());
433 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
434 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
435 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
436 col_start_after.push(start_after);
437 col_end_before.push(end_before);
438 }
439
440 {
441 let stmt = tx
442 .prepare(
443 r#"--sql
444 INSERT INTO ora.schedule (
445 id,
446 job_template_job_type_id,
447 job_template_json,
448 scheduling_policy_json,
449 start_after,
450 end_before
451 ) SELECT * FROM UNNEST(
452 $1::UUID[],
453 $2::TEXT[],
454 $3::TEXT[],
455 $4::TEXT[],
456 $5::TIMESTAMPTZ[],
457 $6::TIMESTAMPTZ[]
458 )
459 "#,
460 )
461 .await?;
462
463 tx.execute(
464 &stmt,
465 &[
466 &col_id,
467 &col_job_template_job_type_id,
468 &col_job_template_json,
469 &col_scheduling_policy_json,
470 &col_start_after,
471 &col_end_before,
472 ],
473 )
474 .await?;
475 }
476
477 {
478 let mut col_schedule_id = Vec::with_capacity(schedules.len());
479 let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
480 let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
481
482 for (i, schedule) in schedules.iter().enumerate() {
483 for label in &schedule.labels {
484 col_schedule_id.push(col_id[i]);
485 col_schedule_label_key.push(label.key.as_str());
486 col_schedule_label_value.push(label.value.as_str());
487 }
488 }
489
490 let stmt = tx
491 .prepare(
492 r#"--sql
493 INSERT INTO ora.schedule_label (
494 schedule_id,
495 label_key,
496 label_value
497 ) SELECT * FROM UNNEST(
498 $1::UUID[],
499 $2::TEXT[],
500 $3::TEXT[]
501 )
502 "#,
503 )
504 .await?;
505
506 tx.execute(
507 &stmt,
508 &[
509 &col_schedule_id,
510 &col_schedule_label_key,
511 &col_schedule_label_value,
512 ],
513 )
514 .await?;
515 }
516
517 tx.commit().await?;
518
519 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
520
521 Ok(AddedSchedules::Added(schedule_ids))
522 }
523
524 async fn list_schedules(
525 &self,
526 filters: ScheduleFilters,
527 order_by: Option<ScheduleOrderBy>,
528 page_size: u32,
529 page_token: Option<NextPageToken>,
530 ) -> Result<(
531 Vec<ScheduleDetails>,
532 Option<ora_backend::common::NextPageToken>,
533 )> {
534 let mut conn = self.pool.get().await?;
535 let tx = conn.read_only_transaction().await?;
536
537 let (schedules, next_page_token) = query::schedules::schedule_details(
538 &tx,
539 filters,
540 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
541 page_size,
542 page_token.map(|t| t.0),
543 )
544 .await?;
545
546 tx.commit().await?;
547
548 Ok((schedules, next_page_token))
549 }
550
551 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
552 let mut conn = self.pool.get().await?;
553 let tx = conn.read_only_transaction().await?;
554 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
555 tx.commit().await?;
556 Ok(count)
557 }
558
559 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
560 let mut conn = self.pool.get().await?;
561 let tx = conn.transaction().await?;
562 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
563 tx.commit().await?;
564 Ok(schedules)
565 }
566
567 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
568 async_stream::try_stream!({
569 let mut last_execution_id: Option<ExecutionId> = None;
570
571 loop {
572 let mut conn = self.pool.get().await?;
573 let tx = conn.read_only_transaction().await?;
574
575 let rows = if let Some(last_execution_id) = last_execution_id {
576 let stmt = tx
577 .prepare(
578 r#"--sql
579 SELECT
580 ora.execution.id,
581 ora.job.id,
582 ora.job.job_type_id,
583 ora.job.input_payload_json,
584 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
585 ora.job.retry_policy_json,
586 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
587 FROM
588 ora.execution
589 JOIN ora.job ON
590 ora.execution.job_id = ora.job.id
591 WHERE
592 status = 0
593 AND ora.job.target_execution_time <= NOW()
594 AND ora.execution.id > $1::UUID
595 ORDER BY
596 ora.execution.id ASC
597 LIMIT 1000
598 "#,
599 )
600 .await?;
601
602 tx.query(&stmt, &[&last_execution_id.0]).await?
603 } else {
604 let stmt = tx
605 .prepare(
606 r#"--sql
607 SELECT
608 ora.execution.id,
609 ora.job.id,
610 ora.job.job_type_id,
611 ora.job.input_payload_json,
612 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
613 ora.job.retry_policy_json,
614 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
615 FROM
616 ora.execution
617 JOIN ora.job ON
618 ora.execution.job_id = ora.job.id
619 WHERE
620 status = 0
621 AND ora.job.target_execution_time <= NOW()
622 ORDER BY
623 ora.execution.id ASC
624 LIMIT 1000
625 "#,
626 )
627 .await?;
628
629 tx.query(&stmt, &[]).await?
630 };
631
632 tx.commit().await?;
633
634 if rows.is_empty() {
635 break;
636 }
637
638 let mut ready_executions = Vec::with_capacity(rows.len());
639
640 for row in rows {
641 ready_executions.push(ReadyExecution {
642 execution_id: ExecutionId(row.try_get(0)?),
643 job_id: JobId(row.try_get(1)?),
644 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
645 input_payload_json: row.try_get(3)?,
646 attempt_number: row.try_get::<_, i64>(4)? as u64,
647 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
648 target_execution_time: UNIX_EPOCH
649 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
650 });
651 }
652
653 last_execution_id = ready_executions.last().map(|e| e.execution_id);
654
655 yield ready_executions;
656 }
657 })
658 }
659
660 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
661 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
662
663 loop {
664 let mut conn = self.pool.get().await?;
665
666 let next_timestamp = {
667 let tx = conn.read_only_transaction().await?;
668
669 let stmt = tx
670 .prepare(
671 r#"--sql
672 SELECT
673 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
674 FROM
675 ora.job
676 JOIN ora.execution ON
677 ora.execution.job_id = ora.job.id
678 WHERE
679 ora.execution.status = 0
680 AND NOT (ora.execution.id = ANY($1::UUID[]))
681 ORDER BY
682 ora.job.target_execution_time ASC
683 LIMIT 1
684 "#,
685 )
686 .await?;
687
688 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
689
690 tx.commit().await?;
691
692 match row {
693 Some(row) => row
694 .try_get::<_, Option<f64>>(0)?
695 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
696 None => None,
697 }
698 };
699
700 drop(conn);
701
702 if let Some(next_timestamp) = next_timestamp {
703 let now = std::time::SystemTime::now();
704
705 if next_timestamp <= now {
706 break;
707 }
708
709 tokio::time::sleep(
710 next_timestamp
711 .duration_since(now)
712 .unwrap_or_else(|_| Duration::from_secs(0)),
713 )
714 .await;
715 break;
716 }
717
718 tokio::time::sleep(Duration::from_millis(500)).await;
719 }
720
721 Ok(())
722 }
723
724 fn in_progress_executions(
725 &self,
726 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
727 async_stream::try_stream!({
728 let mut last_execution_id: Option<ExecutionId> = None;
729
730 loop {
731 let mut conn = self.pool.get().await?;
732 let tx = conn.read_only_transaction().await?;
733
734 let rows = if let Some(last_execution_id) = last_execution_id {
735 let stmt = tx
736 .prepare(
737 r#"--sql
738 SELECT
739 ora.execution.id,
740 ora.job.id,
741 ora.execution.executor_id,
742 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
743 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
744 ora.job.timeout_policy_json,
745 ora.job.retry_policy_json,
746 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
747 FROM
748 ora.execution
749 JOIN ora.job ON
750 ora.execution.job_id = ora.job.id
751 WHERE
752 status = 1
753 AND ora.execution.id > $1::UUID
754 ORDER BY
755 ora.execution.id ASC
756 LIMIT 1000
757 "#,
758 )
759 .await?;
760
761 tx.query(&stmt, &[&last_execution_id.0]).await?
762 } else {
763 let stmt = tx
764 .prepare(
765 r#"--sql
766 SELECT
767 ora.execution.id,
768 ora.job.id,
769 ora.execution.executor_id,
770 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
771 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
772 ora.job.timeout_policy_json,
773 ora.job.retry_policy_json,
774 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
775 FROM
776 ora.execution
777 JOIN ora.job ON
778 ora.execution.job_id = ora.job.id
779 WHERE
780 status = 1
781 ORDER BY
782 ora.execution.id ASC
783 LIMIT 1000
784 "#,
785 )
786 .await?;
787
788 tx.query(&stmt, &[]).await?
789 };
790
791 tx.commit().await?;
792
793 if rows.is_empty() {
794 break;
795 }
796
797 let mut in_progress_executions = Vec::with_capacity(rows.len());
798
799 for row in rows {
800 in_progress_executions.push(InProgressExecution {
801 execution_id: ExecutionId(row.try_get(0)?),
802 job_id: JobId(row.try_get(1)?),
803 executor_id: ExecutorId(row.try_get(2)?),
804 target_execution_time: UNIX_EPOCH
805 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
806 started_at: UNIX_EPOCH
807 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
808 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
809 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
810 attempt_number: row.try_get::<_, i64>(7)? as u64,
811 });
812 }
813
814 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
815
816 yield in_progress_executions;
817 }
818 })
819 }
820
821 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
822 if executions.is_empty() {
823 return Ok(());
824 }
825
826 let mut conn = self.pool.get().await?;
827
828 let tx = conn.transaction().await?;
829
830 let stmt = tx
831 .prepare(
832 r#"--sql
833 UPDATE ora.execution
834 SET
835 executor_id = t.executor_id,
836 started_at = to_timestamp(t.started_at)
837 FROM UNNEST(
838 $1::UUID[],
839 $2::UUID[],
840 $3::DOUBLE PRECISION[]
841 ) AS t(execution_id, executor_id, started_at)
842 WHERE
843 execution_id = id
844 AND ora.execution.started_at IS NULL
845 "#,
846 )
847 .await?;
848
849 let mut col_execution_id = Vec::with_capacity(executions.len());
850 let mut col_executor_id = Vec::with_capacity(executions.len());
851 let mut col_started_at = Vec::with_capacity(executions.len());
852
853 for execution in executions {
854 col_execution_id.push(execution.execution_id.0);
855 col_executor_id.push(execution.executor_id.0);
856 col_started_at.push(
857 execution
858 .started_at
859 .duration_since(UNIX_EPOCH)
860 .unwrap_or_default()
861 .as_secs_f64(),
862 );
863 }
864
865 tx.execute(
866 &stmt,
867 &[&col_execution_id, &col_executor_id, &col_started_at],
868 )
869 .await?;
870
871 tx.commit().await?;
872
873 Ok(())
874 }
875
876 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
877 if executions.is_empty() {
878 return Ok(());
879 }
880
881 let mut conn = self.pool.get().await?;
882
883 let tx = conn.transaction().await?;
884
885 let stmt = tx
886 .prepare(
887 r#"--sql
888 UPDATE ora.execution
889 SET
890 succeeded_at = to_timestamp(t.succeeded_at),
891 output_json = t.output_json
892 FROM UNNEST(
893 $1::UUID[],
894 $2::DOUBLE PRECISION[],
895 $3::TEXT[]
896 ) AS t(execution_id, succeeded_at, output_json)
897 WHERE
898 execution_id = id
899 AND status < 2
900 RETURNING job_id, t.succeeded_at
901 "#,
902 )
903 .await?;
904
905 let mut col_execution_id = Vec::with_capacity(executions.len());
906 let mut col_succeeded_at = Vec::with_capacity(executions.len());
907 let mut col_output_json = Vec::with_capacity(executions.len());
908
909 for execution in executions {
910 col_execution_id.push(execution.execution_id.0);
911 col_succeeded_at.push(
912 execution
913 .succeeded_at
914 .duration_since(UNIX_EPOCH)
915 .unwrap_or_default()
916 .as_secs_f64(),
917 );
918 col_output_json.push(execution.output_json.as_str());
919 }
920
921 let rows = tx
922 .query(
923 &stmt,
924 &[&col_execution_id, &col_succeeded_at, &col_output_json],
925 )
926 .await?;
927
928 let mut jobs = Vec::with_capacity(rows.len());
929 for row in rows {
930 jobs.push((
931 JobId(row.try_get(0)?),
932 UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
933 ));
934 }
935
936 mark_jobs_inactive(&tx, &jobs).await?;
937
938 tx.commit().await?;
939
940 Ok(())
941 }
942
943 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
944 if executions.is_empty() {
945 return Ok(());
946 }
947
948 let mut conn = self.pool.get().await?;
949 let tx = conn.transaction().await?;
950
951 let jobs = executions_failed(&tx, executions).await?;
952
953 mark_jobs_inactive(&tx, &jobs).await?;
954
955 tx.commit().await?;
956
957 Ok(())
958 }
959
960 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
961 if executions.is_empty() {
962 return Ok(());
963 }
964
965 let mut conn = self.pool.get().await?;
966 let tx = conn.transaction().await?;
967
968 let jobs = executions_failed(&tx, executions).await?;
969 add_executions(
970 &tx,
971 &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
972 )
973 .await?;
974
975 tx.commit().await?;
976
977 Ok(())
978 }
979
980 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
981 async_stream::try_stream!({
982 let mut last_schedule_id: Option<ScheduleId> = None;
983 loop {
984 let mut conn = self.pool.get().await?;
985 let tx = conn.read_only_transaction().await?;
986
987 let rows = if let Some(last_schedule_id) = last_schedule_id {
988 let stmt = tx
989 .prepare(
990 r#"--sql
991 SELECT
992 id,
993 EXTRACT(EPOCH FROM(
994 SELECT
995 target_execution_time
996 FROM
997 ora.job
998 WHERE
999 ora.job.schedule_id = ora.schedule.id
1000 ORDER BY ora.job.id DESC
1001 LIMIT 1
1002 ))::DOUBLE PRECISION,
1003 scheduling_policy_json,
1004 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1005 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1006 job_template_json
1007 FROM
1008 ora.schedule
1009 WHERE
1010 (
1011 ora.schedule.stopped_at IS NULL
1012 AND ora.schedule.active_job_id IS NULL
1013 )
1014 AND id > $1::UUID
1015 ORDER BY id ASC
1016 LIMIT 1000
1017 "#,
1018 )
1019 .await?;
1020
1021 tx.query(&stmt, &[&last_schedule_id.0]).await?
1022 } else {
1023 let stmt = tx
1024 .prepare(
1025 r#"--sql
1026 SELECT
1027 id,
1028 EXTRACT(EPOCH FROM(
1029 SELECT
1030 target_execution_time
1031 FROM
1032 ora.job
1033 WHERE
1034 ora.job.schedule_id = ora.schedule.id
1035 ORDER BY ora.job.id DESC
1036 LIMIT 1
1037 ))::DOUBLE PRECISION,
1038 scheduling_policy_json,
1039 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1040 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1041 job_template_json
1042 FROM
1043 ora.schedule
1044 WHERE
1045 ora.schedule.stopped_at IS NULL
1046 AND ora.schedule.active_job_id IS NULL
1047 ORDER BY id ASC
1048 LIMIT 1000
1049 "#,
1050 )
1051 .await?;
1052
1053 tx.query(&stmt, &[]).await?
1054 };
1055
1056 tx.commit().await?;
1057
1058 if rows.is_empty() {
1059 break;
1060 }
1061
1062 let mut schedules = Vec::with_capacity(rows.len());
1063
1064 for row in rows {
1065 schedules.push(PendingSchedule {
1066 schedule_id: ScheduleId(row.try_get(0)?),
1067 last_target_execution_time: row
1068 .try_get::<_, Option<f64>>(1)?
1069 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1070 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1071 time_range: TimeRange {
1072 start: row
1073 .try_get::<_, Option<f64>>(3)?
1074 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1075 end: row
1076 .try_get::<_, Option<f64>>(4)?
1077 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1078 },
1079 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1080 });
1081 }
1082
1083 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1084 yield schedules;
1085 }
1086 })
1087 }
1088
1089 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1090 let batch_size = i64::try_from(self.delete_batch_size).unwrap_or(i64::MAX);
1091
1092 let mut conn = self.pool.get().await?;
1093
1094 loop {
1097 let mut deleted_rows = 0;
1098
1099 {
1100 let tx = conn.transaction().await?;
1101
1102 let stmt = tx
1103 .prepare(
1104 r#"--sql
1105 WITH cte AS (
1106 SELECT ctid
1107 FROM ora.job
1108 WHERE inactive_since < to_timestamp($1)
1109 ORDER BY inactive_since
1110 LIMIT $2
1111 )
1112 DELETE FROM ora.job j
1113 USING cte
1114 WHERE j.ctid = cte.ctid;
1115 "#,
1116 )
1117 .await?;
1118
1119 let deleted = tx
1120 .execute(
1121 &stmt,
1122 &[
1123 &before
1124 .duration_since(UNIX_EPOCH)
1125 .unwrap_or_default()
1126 .as_secs_f64(),
1127 &batch_size,
1128 ],
1129 )
1130 .await?;
1131
1132 deleted_rows += deleted;
1133
1134 tx.commit().await?;
1135 }
1136
1137 {
1138 let tx = conn.transaction().await?;
1139 let stmt = tx
1140 .prepare(
1141 r#"--sql
1142 WITH cte AS (
1143 SELECT ctid
1144 FROM ora.schedule
1145 WHERE stopped_at < to_timestamp($1)
1146 ORDER BY stopped_at
1147 LIMIT $2
1148 )
1149 DELETE FROM ora.schedule s
1150 USING cte
1151 WHERE s.ctid = cte.ctid;
1152 "#,
1153 )
1154 .await?;
1155
1156 let deleted = tx
1157 .execute(
1158 &stmt,
1159 &[
1160 &before
1161 .duration_since(UNIX_EPOCH)
1162 .unwrap_or_default()
1163 .as_secs_f64(),
1164 &batch_size,
1165 ],
1166 )
1167 .await?;
1168
1169 tx.commit().await?;
1170
1171 deleted_rows += deleted;
1172 }
1173
1174 {
1175 let tx = conn.transaction().await?;
1176
1177 let stmt = tx
1178 .prepare(
1179 r#"--sql
1180 DELETE FROM ora.job_type
1181 WHERE
1182 NOT EXISTS (
1183 SELECT FROM ora.job
1184 WHERE
1185 ora.job.job_type_id = ora.job_type.id
1186 )
1187 AND NOT EXISTS (
1188 SELECT FROM ora.schedule
1189 WHERE
1190 ora.schedule.job_template_job_type_id = ora.job_type.id
1191 )
1192 "#,
1193 )
1194 .await?;
1195
1196 tx.execute(&stmt, &[]).await?;
1197
1198 tx.commit().await?;
1199 }
1200
1201 if deleted_rows == 0 {
1202 break;
1203 }
1204 }
1205
1206 Ok(())
1207 }
1208
1209 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1210 loop {
1211 let mut conn = self.pool.get().await?;
1212
1213 let has_pending = {
1214 let tx = conn.read_only_transaction().await?;
1215
1216 let stmt = tx
1217 .prepare(
1218 r#"--sql
1219 SELECT 1 FROM ora.schedule
1220 WHERE
1221 ora.schedule.stopped_at IS NULL
1222 AND ora.schedule.active_job_id IS NULL
1223 LIMIT 1
1224 "#,
1225 )
1226 .await?;
1227
1228 let row = tx.query_opt(&stmt, &[]).await?;
1229
1230 tx.commit().await?;
1231
1232 row.is_some()
1233 };
1234
1235 drop(conn);
1236
1237 if has_pending {
1238 break;
1239 }
1240
1241 tokio::time::sleep(Duration::from_millis(500)).await;
1242 }
1243
1244 Ok(())
1245 }
1246}
1247
1248async fn executions_failed(
1249 tx: &DbTransaction<'_>,
1250 executions: &[FailedExecution],
1251) -> Result<Vec<(JobId, SystemTime)>> {
1252 let stmt = tx
1253 .prepare(
1254 r#"--sql
1255 UPDATE ora.execution
1256 SET
1257 failed_at = to_timestamp(t.failed_at),
1258 failure_reason = t.failure_reason
1259 FROM UNNEST(
1260 $1::UUID[],
1261 $2::DOUBLE PRECISION[],
1262 $3::TEXT[]
1263 ) AS t(execution_id, failed_at, failure_reason)
1264 WHERE
1265 execution_id = id
1266 AND status < 2
1267 RETURNING job_id, t.failed_at
1268 "#,
1269 )
1270 .await?;
1271
1272 let mut col_execution_id = Vec::with_capacity(executions.len());
1273 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1274 let mut col_failure_reason = Vec::with_capacity(executions.len());
1275
1276 for execution in executions {
1277 col_execution_id.push(execution.execution_id.0);
1278 col_succeeded_at.push(
1279 execution
1280 .failed_at
1281 .duration_since(UNIX_EPOCH)
1282 .unwrap_or_default()
1283 .as_secs_f64(),
1284 );
1285 col_failure_reason.push(execution.failure_reason.as_str());
1286 }
1287
1288 let rows = tx
1289 .query(
1290 &stmt,
1291 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1292 )
1293 .await?;
1294
1295 let mut jobs = Vec::with_capacity(rows.len());
1296
1297 for row in rows {
1298 jobs.push((
1299 JobId(row.try_get(0)?),
1300 UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
1301 ));
1302 }
1303
1304 Ok(jobs)
1305}
1306
1307async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1308 if jobs.is_empty() {
1309 return Ok(());
1310 }
1311
1312 let mut col_job_id = Vec::with_capacity(jobs.len());
1313 let mut col_execution_id = Vec::with_capacity(jobs.len());
1314
1315 for job in jobs {
1316 col_job_id.push(job.0);
1317 col_execution_id.push(Uuid::now_v7());
1318 }
1319
1320 let stmt = tx
1321 .prepare(
1322 r#"--sql
1323 INSERT INTO ora.execution (
1324 id,
1325 job_id
1326 ) SELECT * FROM UNNEST(
1327 $1::UUID[],
1328 $2::UUID[]
1329 )
1330 "#,
1331 )
1332 .await?;
1333
1334 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1335
1336 Ok(())
1337}
1338
1339async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1340 if jobs.is_empty() {
1341 return Ok(());
1342 }
1343
1344 let mut col_job_id = Vec::with_capacity(jobs.len());
1345 let mut col_inactive_since = Vec::with_capacity(jobs.len());
1346
1347 for (job, inactive_since) in jobs {
1348 col_job_id.push(job.0);
1349 col_inactive_since.push(
1350 inactive_since
1351 .duration_since(UNIX_EPOCH)
1352 .unwrap_or_default()
1353 .as_secs_f64(),
1354 );
1355 }
1356
1357 {
1358 let stmt = tx
1359 .prepare(
1360 r#"--sql
1361 UPDATE ora.job
1362 SET inactive_since = to_timestamp(t.inactive_since)
1363 FROM
1364 UNNEST(
1365 $1::UUID[],
1366 $2::DOUBLE PRECISION[]
1367 ) AS t(job_id, inactive_since)
1368 WHERE
1369 id = t.job_id;
1370 "#,
1371 )
1372 .await?;
1373
1374 tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1375 .await?;
1376 }
1377
1378 {
1379 let stmt = tx
1380 .prepare(
1381 r#"--sql
1382 UPDATE ora.schedule
1383 SET
1384 active_job_id = NULL
1385 WHERE active_job_id = ANY($1::UUID[])
1386 "#,
1387 )
1388 .await?;
1389
1390 tx.execute(&stmt, &[&col_job_id]).await?;
1391 }
1392
1393 Ok(())
1394}