1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8 Backend,
9 common::{NextPageToken, TimeRange},
10 executions::{
11 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12 SucceededExecution,
13 },
14 executors::ExecutorId,
15 jobs::{CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId, NewJob},
16 schedules::{
17 PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters, ScheduleId,
18 ScheduleOrderBy, StoppedSchedule,
19 },
20};
21
22use refinery::embed_migrations;
23use thiserror::Error;
24use uuid::Uuid;
25
26use crate::{
27 db::{DbPool, DbTransaction},
28 query::{
29 jobs::{cancel_jobs, job_count, job_exists},
30 schedules::{schedule_count, schedule_exists},
31 },
32};
33
34embed_migrations!("./migrations");
35
36mod db;
37mod models;
38mod query;
39
40pub struct PostgresBackend {
41 pool: DbPool,
42}
43
44type Result<T> = core::result::Result<T, Error>;
45
46#[derive(Error, Debug)]
48pub enum Error {
49 #[error("{0}")]
50 Migrations(#[from] refinery::Error),
51 #[error("{0}")]
52 Pool(#[from] PoolError),
53 #[error("{0}")]
54 Postgres(#[from] tokio_postgres::Error),
55 #[error("{0}")]
56 Serde(#[from] serde_json::Error),
57 #[error("invalid page token: {0}")]
58 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
59}
60
61impl PostgresBackend {
62 pub async fn new(pool: Pool) -> Result<Self> {
65 let mut conn = pool.get().await?;
66 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
67 migrations::runner()
68 .set_migration_table_name("ora.migrations")
69 .run_async(&mut **conn)
70 .await?;
71 Ok(Self { pool: DbPool(pool) })
72 }
73}
74
75impl Backend for PostgresBackend {
76 type Error = Error;
77
78 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
79 if job_types.is_empty() {
80 return Ok(());
81 }
82
83 let mut conn = self.pool.get().await?;
84
85 let tx = conn.transaction().await?;
86
87 let mut col_id = Vec::with_capacity(job_types.len());
88 let mut col_description = Vec::with_capacity(job_types.len());
89 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
90 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
91
92 for job_type in job_types {
93 col_id.push(job_type.id.as_str());
94 col_description.push(job_type.description.as_deref());
95 col_input_schema_json.push(job_type.input_schema_json.as_deref());
96 col_output_schema_json.push(job_type.output_schema_json.as_deref());
97 }
98
99 {
100 let stmt = tx
101 .prepare(
102 r#"--sql
103 INSERT INTO ora.job_type (
104 id,
105 description,
106 input_schema_json,
107 output_schema_json
108 ) SELECT * FROM UNNEST(
109 $1::TEXT[],
110 $2::TEXT[],
111 $3::TEXT[],
112 $4::TEXT[]
113 ) ON CONFLICT (id) DO UPDATE SET
114 description = EXCLUDED.description,
115 input_schema_json = EXCLUDED.input_schema_json,
116 output_schema_json = EXCLUDED.output_schema_json
117 WHERE
118 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
119 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
120 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
121 "#,
122 )
123 .await?;
124
125 tx.execute(
126 &stmt,
127 &[
128 &col_id,
129 &col_description,
130 &col_input_schema_json,
131 &col_output_schema_json,
132 ],
133 )
134 .await?;
135 }
136
137 tx.commit().await?;
138
139 Ok(())
140 }
141
142 async fn list_job_types(&self) -> Result<Vec<JobType>> {
143 let mut conn = self.pool.get().await?;
144
145 let tx = conn.read_only_transaction().await?;
146
147 let job_types = {
148 let stmt = tx
149 .prepare(
150 r#"--sql
151 SELECT
152 id,
153 description,
154 input_schema_json,
155 output_schema_json
156 FROM
157 ora.job_type
158 ORDER BY id
159 "#,
160 )
161 .await?;
162
163 let rows = tx.query(&stmt, &[]).await?;
164
165 rows.into_iter()
166 .map(|row| {
167 Result::<_>::Ok(JobType {
168 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
169 description: row.try_get(1)?,
170 input_schema_json: row.try_get(2)?,
171 output_schema_json: row.try_get(3)?,
172 })
173 })
174 .collect::<Result<Vec<_>>>()?
175 };
176
177 tx.commit().await?;
178
179 Ok(job_types)
180 }
181
182 async fn add_jobs(
183 &self,
184 jobs: &[NewJob],
185 if_not_exists: Option<JobFilters>,
186 ) -> Result<Vec<JobId>> {
187 if jobs.is_empty() {
188 return Ok(Vec::new());
189 }
190
191 let mut conn = self.pool.get().await?;
192
193 let tx = conn.transaction().await?;
194
195 if let Some(filters) = if_not_exists
196 && job_exists(&tx, filters).await?
197 {
198 tx.commit().await?;
199 return Ok(Vec::new());
200 }
201
202 let mut col_id = Vec::with_capacity(jobs.len());
203 let mut col_schedule_id = Vec::with_capacity(jobs.len());
204
205 {
206 let mut col_job_type_id = Vec::with_capacity(jobs.len());
207 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
208 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
209 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
210 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
211
212 for job in jobs {
213 col_id.push(Uuid::now_v7());
214 col_job_type_id.push(job.job.job_type_id.as_str());
215 col_target_execution_time.push(job.job.target_execution_time);
216 col_input_payload_json.push(job.job.input_payload_json.as_str());
217 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
218 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
219 col_schedule_id.push(job.schedule_id.map(|s| s.0));
220 }
221
222 let stmt = tx
223 .prepare(
224 r#"--sql
225 INSERT INTO ora.job (
226 id,
227 job_type_id,
228 target_execution_time,
229 input_payload_json,
230 timeout_policy_json,
231 retry_policy_json,
232 schedule_id
233 ) SELECT * FROM UNNEST(
234 $1::UUID[],
235 $2::TEXT[],
236 $3::TIMESTAMPTZ[],
237 $4::TEXT[],
238 $5::TEXT[],
239 $6::TEXT[],
240 $7::UUID[]
241 )
242 "#,
243 )
244 .await?;
245
246 tx.execute(
247 &stmt,
248 &[
249 &col_id,
250 &col_job_type_id,
251 &col_target_execution_time,
252 &col_input_payload_json,
253 &col_timeout_policy_json,
254 &col_retry_policy_json,
255 &col_schedule_id,
256 ],
257 )
258 .await?;
259 }
260
261 {
262 let mut col_job_id = Vec::with_capacity(jobs.len());
263 let mut col_job_label_key = Vec::with_capacity(jobs.len());
264 let mut col_job_label_value = Vec::with_capacity(jobs.len());
265
266 for (i, job) in jobs.iter().enumerate() {
267 for label in &job.job.labels {
268 col_job_id.push(col_id[i]);
269 col_job_label_key.push(label.key.as_str());
270 col_job_label_value.push(label.value.as_str());
271 }
272 }
273
274 let stmt = tx
275 .prepare(
276 r#"--sql
277 INSERT INTO ora.job_label (
278 job_id,
279 label_key,
280 label_value
281 ) SELECT * FROM UNNEST(
282 $1::UUID[],
283 $2::TEXT[],
284 $3::TEXT[]
285 )
286 "#,
287 )
288 .await?;
289
290 tx.execute(
291 &stmt,
292 &[&col_job_id, &col_job_label_key, &col_job_label_value],
293 )
294 .await?;
295 }
296
297 {
298 let stmt = tx
299 .prepare(
300 r#"--sql
301 UPDATE ora.schedule
302 SET
303 active_job_id = t.job_id
304 FROM UNNEST(
305 $1::UUID[],
306 $2::UUID[]
307 ) AS t(job_id, schedule_id)
308 WHERE
309 ora.schedule.id = t.schedule_id
310 "#,
311 )
312 .await?;
313
314 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
315 }
316
317 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
318 add_executions(&tx, &job_ids).await?;
319
320 tx.commit().await?;
321
322 Ok(job_ids)
323 }
324
325 async fn list_jobs(
326 &self,
327 filters: JobFilters,
328 order_by: Option<JobOrderBy>,
329 page_size: u32,
330 page_token: Option<NextPageToken>,
331 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
332 let mut conn = self.pool.get().await?;
333 let tx = conn.read_only_transaction().await?;
334
335 let (jobs, next_page_token) = query::jobs::job_details(
336 &tx,
337 filters,
338 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
339 page_size,
340 page_token.map(|t| t.0),
341 )
342 .await?;
343
344 tx.commit().await?;
345
346 Ok((jobs, next_page_token))
347 }
348
349 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
350 let mut conn = self.pool.get().await?;
351 let tx = conn.read_only_transaction().await?;
352 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
353 tx.commit().await?;
354 Ok(count)
355 }
356
357 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
358 let mut conn = self.pool.get().await?;
359 let tx = conn.transaction().await?;
360 let jobs = cancel_jobs(&tx, filters).await?;
361
362 let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
363
364 mark_jobs_inactive(&tx, &job_ids).await?;
365
366 tx.commit().await?;
367
368 Ok(jobs)
369 }
370
371 async fn add_schedules(
372 &self,
373 schedules: &[ScheduleDefinition],
374 if_not_exists: Option<ScheduleFilters>,
375 ) -> Result<Vec<ScheduleId>> {
376 if schedules.is_empty() {
377 return Ok(Vec::new());
378 }
379
380 let mut conn = self.pool.get().await?;
381 let tx = conn.transaction().await?;
382
383 if let Some(filters) = if_not_exists
384 && schedule_exists(&tx, filters).await?
385 {
386 tx.commit().await?;
387 return Ok(Vec::new());
388 }
389
390 let mut col_id = Vec::with_capacity(schedules.len());
391 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
392 let mut col_job_template_json = Vec::with_capacity(schedules.len());
393 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
394 let mut col_start_after = Vec::with_capacity(schedules.len());
395 let mut col_end_before = Vec::with_capacity(schedules.len());
396
397 for schedule in schedules {
398 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
399
400 col_id.push(Uuid::now_v7());
401 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
402 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
403 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
404 col_start_after.push(start_after);
405 col_end_before.push(end_before);
406 }
407
408 {
409 let stmt = tx
410 .prepare(
411 r#"--sql
412 INSERT INTO ora.schedule (
413 id,
414 job_template_job_type_id,
415 job_template_json,
416 scheduling_policy_json,
417 start_after,
418 end_before
419 ) SELECT * FROM UNNEST(
420 $1::UUID[],
421 $2::TEXT[],
422 $3::TEXT[],
423 $4::TEXT[],
424 $5::TIMESTAMPTZ[],
425 $6::TIMESTAMPTZ[]
426 )
427 "#,
428 )
429 .await?;
430
431 tx.execute(
432 &stmt,
433 &[
434 &col_id,
435 &col_job_template_job_type_id,
436 &col_job_template_json,
437 &col_scheduling_policy_json,
438 &col_start_after,
439 &col_end_before,
440 ],
441 )
442 .await?;
443 }
444
445 tx.commit().await?;
446
447 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
448
449 Ok(schedule_ids)
450 }
451
452 async fn list_schedules(
453 &self,
454 filters: ScheduleFilters,
455 order_by: Option<ScheduleOrderBy>,
456 page_size: u32,
457 page_token: Option<NextPageToken>,
458 ) -> Result<(
459 Vec<ScheduleDetails>,
460 Option<ora_backend::common::NextPageToken>,
461 )> {
462 let mut conn = self.pool.get().await?;
463 let tx = conn.read_only_transaction().await?;
464
465 let (schedules, next_page_token) = query::schedules::schedule_details(
466 &tx,
467 filters,
468 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
469 page_size,
470 page_token.map(|t| t.0),
471 )
472 .await?;
473
474 tx.commit().await?;
475
476 Ok((schedules, next_page_token))
477 }
478
479 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
480 let mut conn = self.pool.get().await?;
481 let tx = conn.read_only_transaction().await?;
482 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
483 tx.commit().await?;
484 Ok(count)
485 }
486
487 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
488 let mut conn = self.pool.get().await?;
489 let tx = conn.transaction().await?;
490 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
491 tx.commit().await?;
492 Ok(schedules)
493 }
494
495 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
496 async_stream::try_stream!({
497 let mut last_execution_id: Option<ExecutionId> = None;
498
499 loop {
500 let mut conn = self.pool.get().await?;
501 let tx = conn.read_only_transaction().await?;
502
503 let rows = if let Some(last_execution_id) = last_execution_id {
504 let stmt = tx
505 .prepare(
506 r#"--sql
507 SELECT
508 ora.execution.id,
509 ora.job.id,
510 ora.job.job_type_id,
511 ora.job.input_payload_json,
512 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
513 ora.job.retry_policy_json,
514 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
515 FROM
516 ora.execution
517 JOIN ora.job ON
518 ora.execution.job_id = ora.job.id
519 WHERE
520 status = 0
521 AND ora.job.target_execution_time <= NOW()
522 AND ora.execution.id > $1::UUID
523 ORDER BY
524 ora.execution.id ASC
525 LIMIT 1000
526 "#,
527 )
528 .await?;
529
530 tx.query(&stmt, &[&last_execution_id.0]).await?
531 } else {
532 let stmt = tx
533 .prepare(
534 r#"--sql
535 SELECT
536 ora.execution.id,
537 ora.job.id,
538 ora.job.job_type_id,
539 ora.job.input_payload_json,
540 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
541 ora.job.retry_policy_json,
542 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
543 FROM
544 ora.execution
545 JOIN ora.job ON
546 ora.execution.job_id = ora.job.id
547 WHERE
548 status = 0
549 AND ora.job.target_execution_time <= NOW()
550 ORDER BY
551 ora.execution.id ASC
552 LIMIT 1000
553 "#,
554 )
555 .await?;
556
557 tx.query(&stmt, &[]).await?
558 };
559
560 tx.commit().await?;
561
562 if rows.is_empty() {
563 break;
564 }
565
566 let mut ready_executions = Vec::with_capacity(rows.len());
567
568 for row in rows {
569 ready_executions.push(ReadyExecution {
570 execution_id: ExecutionId(row.try_get(0)?),
571 job_id: JobId(row.try_get(1)?),
572 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
573 input_payload_json: row.try_get(3)?,
574 attempt_number: row.try_get::<_, i64>(4)? as u64,
575 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
576 target_execution_time: UNIX_EPOCH
577 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
578 });
579 }
580
581 last_execution_id = ready_executions.last().map(|e| e.execution_id);
582
583 yield ready_executions;
584 }
585 })
586 }
587
588 async fn wait_for_ready_executions(&self) -> crate::Result<()> {
589 loop {
590 let mut conn = self.pool.get().await?;
591
592 let next_timestamp = {
593 let tx = conn.read_only_transaction().await?;
594
595 let stmt = tx
596 .prepare(
597 r#"--sql
598 SELECT
599 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
600 FROM
601 ora.job
602 JOIN ora.execution ON
603 ora.execution.job_id = ora.job.id
604 WHERE
605 ora.execution.status = 0
606 ORDER BY
607 ora.job.target_execution_time ASC
608 LIMIT 1
609 "#,
610 )
611 .await?;
612
613 let row = tx.query_opt(&stmt, &[]).await?;
614
615 tx.commit().await?;
616
617 match row {
618 Some(row) => row
619 .try_get::<_, Option<f64>>(0)?
620 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
621 None => None,
622 }
623 };
624
625 drop(conn);
626
627 if let Some(next_timestamp) = next_timestamp {
628 let now = std::time::SystemTime::now();
629
630 if next_timestamp <= now {
631 break;
632 }
633
634 tokio::time::sleep(
635 next_timestamp
636 .duration_since(now)
637 .unwrap_or_else(|_| Duration::from_secs(0)),
638 )
639 .await;
640 break;
641 }
642
643 tokio::time::sleep(Duration::from_millis(500)).await;
644 }
645
646 Ok(())
647 }
648
649 fn in_progress_executions(
650 &self,
651 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
652 async_stream::try_stream!({
653 let mut last_execution_id: Option<ExecutionId> = None;
654
655 loop {
656 let mut conn = self.pool.get().await?;
657 let tx = conn.read_only_transaction().await?;
658
659 let rows = if let Some(last_execution_id) = last_execution_id {
660 let stmt = tx
661 .prepare(
662 r#"--sql
663 SELECT
664 ora.execution.id,
665 ora.job.id,
666 ora.execution.executor_id,
667 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
668 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
669 ora.job.timeout_policy_json,
670 ora.job.retry_policy_json,
671 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
672 FROM
673 ora.execution
674 JOIN ora.job ON
675 ora.execution.job_id = ora.job.id
676 WHERE
677 status = 1
678 AND ora.execution.id > $1::UUID
679 ORDER BY
680 ora.execution.id ASC
681 LIMIT 1000
682 "#,
683 )
684 .await?;
685
686 tx.query(&stmt, &[&last_execution_id.0]).await?
687 } else {
688 let stmt = tx
689 .prepare(
690 r#"--sql
691 SELECT
692 ora.execution.id,
693 ora.job.id,
694 ora.execution.executor_id,
695 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
696 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
697 ora.job.timeout_policy_json,
698 ora.job.retry_policy_json,
699 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
700 FROM
701 ora.execution
702 JOIN ora.job ON
703 ora.execution.job_id = ora.job.id
704 WHERE
705 status = 1
706 ORDER BY
707 ora.execution.id ASC
708 LIMIT 1000
709 "#,
710 )
711 .await?;
712
713 tx.query(&stmt, &[]).await?
714 };
715
716 tx.commit().await?;
717
718 if rows.is_empty() {
719 break;
720 }
721
722 let mut in_progress_executions = Vec::with_capacity(rows.len());
723
724 for row in rows {
725 in_progress_executions.push(InProgressExecution {
726 execution_id: ExecutionId(row.try_get(0)?),
727 job_id: JobId(row.try_get(1)?),
728 executor_id: ExecutorId(row.try_get(2)?),
729 target_execution_time: UNIX_EPOCH
730 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
731 started_at: UNIX_EPOCH
732 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
733 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
734 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
735 attempt_number: row.try_get::<_, i64>(7)? as u64,
736 });
737 }
738
739 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
740
741 yield in_progress_executions;
742 }
743 })
744 }
745
746 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
747 if executions.is_empty() {
748 return Ok(());
749 }
750
751 let mut conn = self.pool.get().await?;
752
753 let tx = conn.transaction().await?;
754
755 let stmt = tx
756 .prepare(
757 r#"--sql
758 UPDATE ora.execution
759 SET
760 executor_id = t.executor_id,
761 started_at = to_timestamp(t.started_at)
762 FROM UNNEST(
763 $1::UUID[],
764 $2::UUID[],
765 $3::DOUBLE PRECISION[]
766 ) AS t(execution_id, executor_id, started_at)
767 WHERE
768 execution_id = id
769 AND ora.execution.started_at IS NULL
770 "#,
771 )
772 .await?;
773
774 let mut col_execution_id = Vec::with_capacity(executions.len());
775 let mut col_executor_id = Vec::with_capacity(executions.len());
776 let mut col_started_at = Vec::with_capacity(executions.len());
777
778 for execution in executions {
779 col_execution_id.push(execution.execution_id.0);
780 col_executor_id.push(execution.executor_id.0);
781 col_started_at.push(
782 execution
783 .started_at
784 .duration_since(UNIX_EPOCH)
785 .unwrap_or_default()
786 .as_secs_f64(),
787 );
788 }
789
790 tx.execute(
791 &stmt,
792 &[&col_execution_id, &col_executor_id, &col_started_at],
793 )
794 .await?;
795
796 tx.commit().await?;
797
798 Ok(())
799 }
800
801 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
802 if executions.is_empty() {
803 return Ok(());
804 }
805
806 let mut conn = self.pool.get().await?;
807
808 let tx = conn.transaction().await?;
809
810 let stmt = tx
811 .prepare(
812 r#"--sql
813 UPDATE ora.execution
814 SET
815 succeeded_at = to_timestamp(t.succeeded_at),
816 output_json = t.output_json
817 FROM UNNEST(
818 $1::UUID[],
819 $2::DOUBLE PRECISION[],
820 $3::TEXT[]
821 ) AS t(execution_id, succeeded_at, output_json)
822 WHERE
823 execution_id = id
824 AND status < 2
825 RETURNING job_id
826 "#,
827 )
828 .await?;
829
830 let mut col_execution_id = Vec::with_capacity(executions.len());
831 let mut col_succeeded_at = Vec::with_capacity(executions.len());
832 let mut col_output_json = Vec::with_capacity(executions.len());
833
834 for execution in executions {
835 col_execution_id.push(execution.execution_id.0);
836 col_succeeded_at.push(
837 execution
838 .succeeded_at
839 .duration_since(UNIX_EPOCH)
840 .unwrap_or_default()
841 .as_secs_f64(),
842 );
843 col_output_json.push(execution.output_json.as_str());
844 }
845
846 let rows = tx
847 .query(
848 &stmt,
849 &[&col_execution_id, &col_succeeded_at, &col_output_json],
850 )
851 .await?;
852
853 let mut job_ids = Vec::with_capacity(rows.len());
854 for row in rows {
855 job_ids.push(JobId(row.try_get(0)?));
856 }
857
858 mark_jobs_inactive(&tx, &job_ids).await?;
859
860 tx.commit().await?;
861
862 Ok(())
863 }
864
865 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
866 if executions.is_empty() {
867 return Ok(());
868 }
869
870 let mut conn = self.pool.get().await?;
871 let tx = conn.transaction().await?;
872
873 let jobs = executions_failed(&tx, executions).await?;
874
875 mark_jobs_inactive(&tx, &jobs).await?;
876
877 tx.commit().await?;
878
879 Ok(())
880 }
881
882 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
883 if executions.is_empty() {
884 return Ok(());
885 }
886
887 let mut conn = self.pool.get().await?;
888 let tx = conn.transaction().await?;
889
890 let job_ids = executions_failed(&tx, executions).await?;
891 add_executions(&tx, &job_ids).await?;
892
893 tx.commit().await?;
894
895 Ok(())
896 }
897
898 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
899 async_stream::try_stream!({
900 let mut last_schedule_id: Option<ScheduleId> = None;
901 loop {
902 let mut conn = self.pool.get().await?;
903 let tx = conn.read_only_transaction().await?;
904
905 let rows = if let Some(last_schedule_id) = last_schedule_id {
906 let stmt = tx
907 .prepare(
908 r#"--sql
909 SELECT
910 id,
911 EXTRACT(EPOCH FROM(
912 SELECT
913 target_execution_time
914 FROM
915 ora.job
916 WHERE
917 ora.job.schedule_id = ora.schedule.id
918 ORDER BY ora.job.id DESC
919 LIMIT 1
920 ))::DOUBLE PRECISION,
921 scheduling_policy_json,
922 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
923 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
924 job_template_json
925 FROM
926 ora.schedule
927 WHERE
928 (
929 ora.schedule.stopped_at IS NULL
930 AND ora.schedule.active_job_id IS NULL
931 )
932 AND id > $1::UUID
933 ORDER BY id ASC
934 LIMIT 1000
935 "#,
936 )
937 .await?;
938
939 tx.query(&stmt, &[&last_schedule_id.0]).await?
940 } else {
941 let stmt = tx
942 .prepare(
943 r#"--sql
944 SELECT
945 id,
946 EXTRACT(EPOCH FROM(
947 SELECT
948 target_execution_time
949 FROM
950 ora.job
951 WHERE
952 ora.job.schedule_id = ora.schedule.id
953 ORDER BY ora.job.id DESC
954 LIMIT 1
955 ))::DOUBLE PRECISION,
956 scheduling_policy_json,
957 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
958 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
959 job_template_json
960 FROM
961 ora.schedule
962 WHERE
963 ora.schedule.stopped_at IS NULL
964 AND ora.schedule.active_job_id IS NULL
965 ORDER BY id ASC
966 LIMIT 1000
967 "#,
968 )
969 .await?;
970
971 tx.query(&stmt, &[]).await?
972 };
973
974 tx.commit().await?;
975
976 if rows.is_empty() {
977 break;
978 }
979
980 let mut schedules = Vec::with_capacity(rows.len());
981
982 for row in rows {
983 schedules.push(PendingSchedule {
984 schedule_id: ScheduleId(row.try_get(0)?),
985 last_target_execution_time: row
986 .try_get::<_, Option<f64>>(1)?
987 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
988 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
989 time_range: TimeRange {
990 start: row
991 .try_get::<_, Option<f64>>(3)?
992 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
993 end: row
994 .try_get::<_, Option<f64>>(4)?
995 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
996 },
997 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
998 });
999 }
1000
1001 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1002 yield schedules;
1003 }
1004 })
1005 }
1006
1007 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1008 let mut conn = self.pool.get().await?;
1009
1010 loop {
1013 let mut deleted_rows = 0;
1014
1015 {
1016 let tx = conn.transaction().await?;
1017
1018 let stmt = tx
1019 .prepare(
1020 r#"--sql
1021 DELETE FROM ora.job
1022 WHERE id IN (
1023 SELECT ora.job.id
1024 FROM ora.job
1025 JOIN LATERAL (
1026 SELECT
1027 status,
1028 succeeded_at,
1029 failed_at,
1030 cancelled_at
1031 FROM
1032 ora.execution
1033 WHERE
1034 ora.execution.job_id = ora.job.id
1035 ORDER BY ora.execution.id DESC
1036 LIMIT 1
1037 ) e ON TRUE
1038 WHERE
1039 ora.job.inactive
1040 AND (
1041 e.succeeded_at < to_timestamp($1)
1042 OR e.failed_at < to_timestamp($1)
1043 OR e.cancelled_at < to_timestamp($1)
1044 )
1045 LIMIT 25
1046 FOR UPDATE SKIP LOCKED
1047 );
1048 "#,
1049 )
1050 .await?;
1051
1052 let deleted = tx
1053 .execute(
1054 &stmt,
1055 &[&before
1056 .duration_since(UNIX_EPOCH)
1057 .unwrap_or_default()
1058 .as_secs_f64()],
1059 )
1060 .await?;
1061
1062 deleted_rows += deleted;
1063
1064 tx.commit().await?;
1065 }
1066
1067 {
1068 let tx = conn.transaction().await?;
1069 let stmt = tx
1070 .prepare(
1071 r#"--sql
1072 DELETE FROM ONLY ora.schedule
1073 WHERE ctid IN (
1074 SELECT ctid
1075 FROM ora.schedule
1076 WHERE
1077 ora.schedule.stopped_at < to_timestamp($1)
1078 LIMIT 25
1079 FOR UPDATE SKIP LOCKED
1080 )
1081 "#,
1082 )
1083 .await?;
1084
1085 let deleted = tx
1086 .execute(
1087 &stmt,
1088 &[&before
1089 .duration_since(UNIX_EPOCH)
1090 .unwrap_or_default()
1091 .as_secs_f64()],
1092 )
1093 .await?;
1094
1095 tx.commit().await?;
1096
1097 deleted_rows += deleted;
1098 }
1099
1100 {
1101 let tx = conn.transaction().await?;
1102
1103 let stmt = tx
1104 .prepare(
1105 r#"--sql
1106 DELETE FROM ora.job_type
1107 WHERE
1108 NOT EXISTS (
1109 SELECT FROM ora.job
1110 WHERE
1111 ora.job.job_type_id = ora.job_type.id
1112 )
1113 AND NOT EXISTS (
1114 SELECT FROM ora.schedule
1115 WHERE
1116 ora.schedule.job_template_job_type_id = ora.job_type.id
1117 )
1118 "#,
1119 )
1120 .await?;
1121
1122 tx.execute(&stmt, &[]).await?;
1123
1124 tx.commit().await?;
1125 }
1126
1127 if deleted_rows == 0 {
1128 break;
1129 }
1130 }
1131
1132 Ok(())
1133 }
1134
1135 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1136 loop {
1137 let mut conn = self.pool.get().await?;
1138
1139 let has_pending = {
1140 let tx = conn.read_only_transaction().await?;
1141
1142 let stmt = tx
1143 .prepare(
1144 r#"--sql
1145 SELECT 1 FROM ora.schedule
1146 WHERE
1147 ora.schedule.stopped_at IS NULL
1148 AND ora.schedule.active_job_id IS NULL
1149 LIMIT 1
1150 "#,
1151 )
1152 .await?;
1153
1154 let row = tx.query_opt(&stmt, &[]).await?;
1155
1156 tx.commit().await?;
1157
1158 row.is_some()
1159 };
1160
1161 drop(conn);
1162
1163 if has_pending {
1164 break;
1165 }
1166
1167 tokio::time::sleep(Duration::from_millis(500)).await;
1168 }
1169
1170 Ok(())
1171 }
1172}
1173
1174async fn executions_failed(
1175 tx: &DbTransaction<'_>,
1176 executions: &[FailedExecution],
1177) -> Result<Vec<JobId>> {
1178 let stmt = tx
1179 .prepare(
1180 r#"--sql
1181 UPDATE ora.execution
1182 SET
1183 failed_at = to_timestamp(t.failed_at),
1184 failure_reason = t.failure_reason
1185 FROM UNNEST(
1186 $1::UUID[],
1187 $2::DOUBLE PRECISION[],
1188 $3::TEXT[]
1189 ) AS t(execution_id, failed_at, failure_reason)
1190 WHERE
1191 execution_id = id
1192 AND status < 2
1193 RETURNING job_id;
1194 "#,
1195 )
1196 .await?;
1197
1198 let mut col_execution_id = Vec::with_capacity(executions.len());
1199 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1200 let mut col_failure_reason = Vec::with_capacity(executions.len());
1201
1202 for execution in executions {
1203 col_execution_id.push(execution.execution_id.0);
1204 col_succeeded_at.push(
1205 execution
1206 .failed_at
1207 .duration_since(UNIX_EPOCH)
1208 .unwrap_or_default()
1209 .as_secs_f64(),
1210 );
1211 col_failure_reason.push(execution.failure_reason.as_str());
1212 }
1213
1214 let rows = tx
1215 .query(
1216 &stmt,
1217 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1218 )
1219 .await?;
1220
1221 let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1222
1223 Ok(job_ids)
1224}
1225
1226async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1227 if jobs.is_empty() {
1228 return Ok(());
1229 }
1230
1231 let mut col_job_id = Vec::with_capacity(jobs.len());
1232 let mut col_execution_id = Vec::with_capacity(jobs.len());
1233
1234 for job in jobs {
1235 col_job_id.push(job.0);
1236 col_execution_id.push(Uuid::now_v7());
1237 }
1238
1239 let stmt = tx
1240 .prepare(
1241 r#"--sql
1242 INSERT INTO ora.execution (
1243 id,
1244 job_id
1245 ) SELECT * FROM UNNEST(
1246 $1::UUID[],
1247 $2::UUID[]
1248 )
1249 "#,
1250 )
1251 .await?;
1252
1253 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1254
1255 Ok(())
1256}
1257
1258async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1259 if jobs.is_empty() {
1260 return Ok(());
1261 }
1262
1263 let mut col_job_id = Vec::with_capacity(jobs.len());
1264
1265 for job in jobs {
1266 col_job_id.push(job.0);
1267 }
1268
1269 {
1270 let stmt = tx
1271 .prepare(
1272 r#"--sql
1273 UPDATE ora.job
1274 SET inactive = TRUE
1275 WHERE id = ANY($1::UUID[])
1276 "#,
1277 )
1278 .await?;
1279
1280 tx.execute(&stmt, &[&col_job_id]).await?;
1281 }
1282
1283 {
1284 let stmt = tx
1285 .prepare(
1286 r#"--sql
1287 UPDATE ora.schedule
1288 SET
1289 active_job_id = NULL
1290 WHERE active_job_id = ANY($1::UUID[])
1291 "#,
1292 )
1293 .await?;
1294
1295 tx.execute(&stmt, &[&col_job_id]).await?;
1296 }
1297
1298 Ok(())
1299}