1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8 Backend,
9 common::{NextPageToken, TimeRange},
10 executions::{
11 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12 SucceededExecution,
13 },
14 executors::ExecutorId,
15 jobs::{
16 AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17 NewJob,
18 },
19 schedules::{
20 AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21 ScheduleId, ScheduleOrderBy, StoppedSchedule,
22 },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30 db::{DbPool, DbTransaction},
31 query::{
32 jobs::{cancel_jobs, job_count},
33 schedules::schedule_count,
34 },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44 pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49#[derive(Error, Debug)]
51pub enum Error {
52 #[error("{0}")]
53 Migrations(#[from] refinery::Error),
54 #[error("{0}")]
55 Pool(#[from] PoolError),
56 #[error("{0}")]
57 Postgres(#[from] tokio_postgres::Error),
58 #[error("{0}")]
59 Serde(#[from] serde_json::Error),
60 #[error("invalid page token: {0}")]
61 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65 pub async fn new(pool: Pool) -> Result<Self> {
68 let mut conn = pool.get().await?;
69 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70 migrations::runner()
71 .set_migration_table_name("ora.migrations")
72 .run_async(&mut **conn)
73 .await?;
74 Ok(Self { pool: DbPool(pool) })
75 }
76}
77
78impl Backend for PostgresBackend {
79 type Error = Error;
80
81 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82 if job_types.is_empty() {
83 return Ok(());
84 }
85
86 let mut conn = self.pool.get().await?;
87
88 let tx = conn.transaction().await?;
89
90 let mut col_id = Vec::with_capacity(job_types.len());
91 let mut col_description = Vec::with_capacity(job_types.len());
92 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95 for job_type in job_types {
96 col_id.push(job_type.id.as_str());
97 col_description.push(job_type.description.as_deref());
98 col_input_schema_json.push(job_type.input_schema_json.as_deref());
99 col_output_schema_json.push(job_type.output_schema_json.as_deref());
100 }
101
102 {
103 let stmt = tx
104 .prepare(
105 r#"--sql
106 INSERT INTO ora.job_type (
107 id,
108 description,
109 input_schema_json,
110 output_schema_json
111 ) SELECT * FROM UNNEST(
112 $1::TEXT[],
113 $2::TEXT[],
114 $3::TEXT[],
115 $4::TEXT[]
116 ) ON CONFLICT (id) DO UPDATE SET
117 description = EXCLUDED.description,
118 input_schema_json = EXCLUDED.input_schema_json,
119 output_schema_json = EXCLUDED.output_schema_json
120 WHERE
121 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124 "#,
125 )
126 .await?;
127
128 tx.execute(
129 &stmt,
130 &[
131 &col_id,
132 &col_description,
133 &col_input_schema_json,
134 &col_output_schema_json,
135 ],
136 )
137 .await?;
138 }
139
140 tx.commit().await?;
141
142 Ok(())
143 }
144
145 async fn list_job_types(&self) -> Result<Vec<JobType>> {
146 let mut conn = self.pool.get().await?;
147
148 let tx = conn.read_only_transaction().await?;
149
150 let job_types = {
151 let stmt = tx
152 .prepare(
153 r#"--sql
154 SELECT
155 id,
156 description,
157 input_schema_json,
158 output_schema_json
159 FROM
160 ora.job_type
161 ORDER BY id
162 "#,
163 )
164 .await?;
165
166 let rows = tx.query(&stmt, &[]).await?;
167
168 rows.into_iter()
169 .map(|row| {
170 Result::<_>::Ok(JobType {
171 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172 description: row.try_get(1)?,
173 input_schema_json: row.try_get(2)?,
174 output_schema_json: row.try_get(3)?,
175 })
176 })
177 .collect::<Result<Vec<_>>>()?
178 };
179
180 tx.commit().await?;
181
182 Ok(job_types)
183 }
184
185 async fn add_jobs(
186 &self,
187 jobs: &[NewJob],
188 if_not_exists: Option<JobFilters>,
189 ) -> Result<AddedJobs> {
190 if jobs.is_empty() {
191 return Ok(AddedJobs::Added(Vec::new()));
192 }
193
194 let mut conn = self.pool.get().await?;
195
196 let tx = conn.transaction().await?;
197
198 if let Some(filters) = if_not_exists {
199 let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201 if !existing_job_ids.is_empty() {
202 tx.commit().await?;
203 return Ok(AddedJobs::Existing(existing_job_ids));
204 }
205 }
206
207 let mut col_id = Vec::with_capacity(jobs.len());
208 let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210 {
211 let mut col_job_type_id = Vec::with_capacity(jobs.len());
212 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217 for job in jobs {
218 col_id.push(Uuid::now_v7());
219 col_job_type_id.push(job.job.job_type_id.as_str());
220 col_target_execution_time.push(job.job.target_execution_time);
221 col_input_payload_json.push(job.job.input_payload_json.as_str());
222 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224 col_schedule_id.push(job.schedule_id.map(|s| s.0));
225 }
226
227 let stmt = tx
228 .prepare(
229 r#"--sql
230 INSERT INTO ora.job (
231 id,
232 job_type_id,
233 target_execution_time,
234 input_payload_json,
235 timeout_policy_json,
236 retry_policy_json,
237 schedule_id
238 ) SELECT * FROM UNNEST(
239 $1::UUID[],
240 $2::TEXT[],
241 $3::TIMESTAMPTZ[],
242 $4::TEXT[],
243 $5::TEXT[],
244 $6::TEXT[],
245 $7::UUID[]
246 )
247 "#,
248 )
249 .await?;
250
251 tx.execute(
252 &stmt,
253 &[
254 &col_id,
255 &col_job_type_id,
256 &col_target_execution_time,
257 &col_input_payload_json,
258 &col_timeout_policy_json,
259 &col_retry_policy_json,
260 &col_schedule_id,
261 ],
262 )
263 .await?;
264 }
265
266 {
267 let mut col_job_id = Vec::with_capacity(jobs.len());
268 let mut col_job_label_key = Vec::with_capacity(jobs.len());
269 let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271 for (i, job) in jobs.iter().enumerate() {
272 for label in &job.job.labels {
273 col_job_id.push(col_id[i]);
274 col_job_label_key.push(label.key.as_str());
275 col_job_label_value.push(label.value.as_str());
276 }
277 }
278
279 let stmt = tx
280 .prepare(
281 r#"--sql
282 INSERT INTO ora.job_label (
283 job_id,
284 label_key,
285 label_value
286 ) SELECT * FROM UNNEST(
287 $1::UUID[],
288 $2::TEXT[],
289 $3::TEXT[]
290 )
291 "#,
292 )
293 .await?;
294
295 tx.execute(
296 &stmt,
297 &[&col_job_id, &col_job_label_key, &col_job_label_value],
298 )
299 .await?;
300 }
301
302 {
303 let stmt = tx
304 .prepare(
305 r#"--sql
306 UPDATE ora.schedule
307 SET
308 active_job_id = t.job_id
309 FROM UNNEST(
310 $1::UUID[],
311 $2::UUID[]
312 ) AS t(job_id, schedule_id)
313 WHERE
314 ora.schedule.id = t.schedule_id
315 "#,
316 )
317 .await?;
318
319 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320 }
321
322 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323 add_executions(&tx, &job_ids).await?;
324
325 tx.commit().await?;
326
327 Ok(AddedJobs::Added(job_ids))
328 }
329
330 async fn list_jobs(
331 &self,
332 filters: JobFilters,
333 order_by: Option<JobOrderBy>,
334 page_size: u32,
335 page_token: Option<NextPageToken>,
336 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337 let mut conn = self.pool.get().await?;
338 let tx = conn.read_only_transaction().await?;
339
340 let (jobs, next_page_token) = query::jobs::job_details(
341 &tx,
342 filters,
343 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344 page_size,
345 page_token.map(|t| t.0),
346 )
347 .await?;
348
349 tx.commit().await?;
350
351 Ok((jobs, next_page_token))
352 }
353
354 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355 let mut conn = self.pool.get().await?;
356 let tx = conn.read_only_transaction().await?;
357 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358 tx.commit().await?;
359 Ok(count)
360 }
361
362 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363 let mut conn = self.pool.get().await?;
364 let tx = conn.transaction().await?;
365 let jobs = cancel_jobs(&tx, filters).await?;
366
367 let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
368
369 mark_jobs_inactive(&tx, &job_ids).await?;
370
371 tx.commit().await?;
372
373 Ok(jobs)
374 }
375
376 async fn add_schedules(
377 &self,
378 schedules: &[ScheduleDefinition],
379 if_not_exists: Option<ScheduleFilters>,
380 ) -> Result<AddedSchedules> {
381 if schedules.is_empty() {
382 return Ok(AddedSchedules::Added(Vec::new()));
383 }
384
385 let mut conn = self.pool.get().await?;
386 let tx = conn.transaction().await?;
387
388 if let Some(filters) = if_not_exists {
389 let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
390
391 if !existing_schedule_ids.is_empty() {
392 tx.commit().await?;
393 return Ok(AddedSchedules::Existing(existing_schedule_ids));
394 }
395 }
396
397 let mut col_id = Vec::with_capacity(schedules.len());
398 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
399 let mut col_job_template_json = Vec::with_capacity(schedules.len());
400 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
401 let mut col_start_after = Vec::with_capacity(schedules.len());
402 let mut col_end_before = Vec::with_capacity(schedules.len());
403
404 for schedule in schedules {
405 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
406
407 col_id.push(Uuid::now_v7());
408 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
409 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
410 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
411 col_start_after.push(start_after);
412 col_end_before.push(end_before);
413 }
414
415 {
416 let stmt = tx
417 .prepare(
418 r#"--sql
419 INSERT INTO ora.schedule (
420 id,
421 job_template_job_type_id,
422 job_template_json,
423 scheduling_policy_json,
424 start_after,
425 end_before
426 ) SELECT * FROM UNNEST(
427 $1::UUID[],
428 $2::TEXT[],
429 $3::TEXT[],
430 $4::TEXT[],
431 $5::TIMESTAMPTZ[],
432 $6::TIMESTAMPTZ[]
433 )
434 "#,
435 )
436 .await?;
437
438 tx.execute(
439 &stmt,
440 &[
441 &col_id,
442 &col_job_template_job_type_id,
443 &col_job_template_json,
444 &col_scheduling_policy_json,
445 &col_start_after,
446 &col_end_before,
447 ],
448 )
449 .await?;
450 }
451
452 {
453 let mut col_schedule_id = Vec::with_capacity(schedules.len());
454 let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
455 let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
456
457 for (i, schedule) in schedules.iter().enumerate() {
458 for label in &schedule.labels {
459 col_schedule_id.push(col_id[i]);
460 col_schedule_label_key.push(label.key.as_str());
461 col_schedule_label_value.push(label.value.as_str());
462 }
463 }
464
465 let stmt = tx
466 .prepare(
467 r#"--sql
468 INSERT INTO ora.schedule_label (
469 schedule_id,
470 label_key,
471 label_value
472 ) SELECT * FROM UNNEST(
473 $1::UUID[],
474 $2::TEXT[],
475 $3::TEXT[]
476 )
477 "#,
478 )
479 .await?;
480
481 tx.execute(
482 &stmt,
483 &[
484 &col_schedule_id,
485 &col_schedule_label_key,
486 &col_schedule_label_value,
487 ],
488 )
489 .await?;
490 }
491
492 tx.commit().await?;
493
494 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
495
496 Ok(AddedSchedules::Added(schedule_ids))
497 }
498
499 async fn list_schedules(
500 &self,
501 filters: ScheduleFilters,
502 order_by: Option<ScheduleOrderBy>,
503 page_size: u32,
504 page_token: Option<NextPageToken>,
505 ) -> Result<(
506 Vec<ScheduleDetails>,
507 Option<ora_backend::common::NextPageToken>,
508 )> {
509 let mut conn = self.pool.get().await?;
510 let tx = conn.read_only_transaction().await?;
511
512 let (schedules, next_page_token) = query::schedules::schedule_details(
513 &tx,
514 filters,
515 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
516 page_size,
517 page_token.map(|t| t.0),
518 )
519 .await?;
520
521 tx.commit().await?;
522
523 Ok((schedules, next_page_token))
524 }
525
526 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
527 let mut conn = self.pool.get().await?;
528 let tx = conn.read_only_transaction().await?;
529 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
530 tx.commit().await?;
531 Ok(count)
532 }
533
534 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
535 let mut conn = self.pool.get().await?;
536 let tx = conn.transaction().await?;
537 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
538 tx.commit().await?;
539 Ok(schedules)
540 }
541
542 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
543 async_stream::try_stream!({
544 let mut last_execution_id: Option<ExecutionId> = None;
545
546 loop {
547 let mut conn = self.pool.get().await?;
548 let tx = conn.read_only_transaction().await?;
549
550 let rows = if let Some(last_execution_id) = last_execution_id {
551 let stmt = tx
552 .prepare(
553 r#"--sql
554 SELECT
555 ora.execution.id,
556 ora.job.id,
557 ora.job.job_type_id,
558 ora.job.input_payload_json,
559 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
560 ora.job.retry_policy_json,
561 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
562 FROM
563 ora.execution
564 JOIN ora.job ON
565 ora.execution.job_id = ora.job.id
566 WHERE
567 status = 0
568 AND ora.job.target_execution_time <= NOW()
569 AND ora.execution.id > $1::UUID
570 ORDER BY
571 ora.execution.id ASC
572 LIMIT 1000
573 "#,
574 )
575 .await?;
576
577 tx.query(&stmt, &[&last_execution_id.0]).await?
578 } else {
579 let stmt = tx
580 .prepare(
581 r#"--sql
582 SELECT
583 ora.execution.id,
584 ora.job.id,
585 ora.job.job_type_id,
586 ora.job.input_payload_json,
587 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
588 ora.job.retry_policy_json,
589 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
590 FROM
591 ora.execution
592 JOIN ora.job ON
593 ora.execution.job_id = ora.job.id
594 WHERE
595 status = 0
596 AND ora.job.target_execution_time <= NOW()
597 ORDER BY
598 ora.execution.id ASC
599 LIMIT 1000
600 "#,
601 )
602 .await?;
603
604 tx.query(&stmt, &[]).await?
605 };
606
607 tx.commit().await?;
608
609 if rows.is_empty() {
610 break;
611 }
612
613 let mut ready_executions = Vec::with_capacity(rows.len());
614
615 for row in rows {
616 ready_executions.push(ReadyExecution {
617 execution_id: ExecutionId(row.try_get(0)?),
618 job_id: JobId(row.try_get(1)?),
619 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
620 input_payload_json: row.try_get(3)?,
621 attempt_number: row.try_get::<_, i64>(4)? as u64,
622 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
623 target_execution_time: UNIX_EPOCH
624 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
625 });
626 }
627
628 last_execution_id = ready_executions.last().map(|e| e.execution_id);
629
630 yield ready_executions;
631 }
632 })
633 }
634
635 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
636 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
637
638 loop {
639 let mut conn = self.pool.get().await?;
640
641 let next_timestamp = {
642 let tx = conn.read_only_transaction().await?;
643
644 let stmt = tx
645 .prepare(
646 r#"--sql
647 SELECT
648 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
649 FROM
650 ora.job
651 JOIN ora.execution ON
652 ora.execution.job_id = ora.job.id
653 WHERE
654 ora.execution.status = 0
655 AND NOT (ora.execution.id = ANY($1::UUID[]))
656 ORDER BY
657 ora.job.target_execution_time ASC
658 LIMIT 1
659 "#,
660 )
661 .await?;
662
663 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
664
665 tx.commit().await?;
666
667 match row {
668 Some(row) => row
669 .try_get::<_, Option<f64>>(0)?
670 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
671 None => None,
672 }
673 };
674
675 drop(conn);
676
677 if let Some(next_timestamp) = next_timestamp {
678 let now = std::time::SystemTime::now();
679
680 if next_timestamp <= now {
681 break;
682 }
683
684 tokio::time::sleep(
685 next_timestamp
686 .duration_since(now)
687 .unwrap_or_else(|_| Duration::from_secs(0)),
688 )
689 .await;
690 break;
691 }
692
693 tokio::time::sleep(Duration::from_millis(500)).await;
694 }
695
696 Ok(())
697 }
698
699 fn in_progress_executions(
700 &self,
701 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
702 async_stream::try_stream!({
703 let mut last_execution_id: Option<ExecutionId> = None;
704
705 loop {
706 let mut conn = self.pool.get().await?;
707 let tx = conn.read_only_transaction().await?;
708
709 let rows = if let Some(last_execution_id) = last_execution_id {
710 let stmt = tx
711 .prepare(
712 r#"--sql
713 SELECT
714 ora.execution.id,
715 ora.job.id,
716 ora.execution.executor_id,
717 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
718 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
719 ora.job.timeout_policy_json,
720 ora.job.retry_policy_json,
721 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
722 FROM
723 ora.execution
724 JOIN ora.job ON
725 ora.execution.job_id = ora.job.id
726 WHERE
727 status = 1
728 AND ora.execution.id > $1::UUID
729 ORDER BY
730 ora.execution.id ASC
731 LIMIT 1000
732 "#,
733 )
734 .await?;
735
736 tx.query(&stmt, &[&last_execution_id.0]).await?
737 } else {
738 let stmt = tx
739 .prepare(
740 r#"--sql
741 SELECT
742 ora.execution.id,
743 ora.job.id,
744 ora.execution.executor_id,
745 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
746 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
747 ora.job.timeout_policy_json,
748 ora.job.retry_policy_json,
749 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
750 FROM
751 ora.execution
752 JOIN ora.job ON
753 ora.execution.job_id = ora.job.id
754 WHERE
755 status = 1
756 ORDER BY
757 ora.execution.id ASC
758 LIMIT 1000
759 "#,
760 )
761 .await?;
762
763 tx.query(&stmt, &[]).await?
764 };
765
766 tx.commit().await?;
767
768 if rows.is_empty() {
769 break;
770 }
771
772 let mut in_progress_executions = Vec::with_capacity(rows.len());
773
774 for row in rows {
775 in_progress_executions.push(InProgressExecution {
776 execution_id: ExecutionId(row.try_get(0)?),
777 job_id: JobId(row.try_get(1)?),
778 executor_id: ExecutorId(row.try_get(2)?),
779 target_execution_time: UNIX_EPOCH
780 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
781 started_at: UNIX_EPOCH
782 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
783 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
784 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
785 attempt_number: row.try_get::<_, i64>(7)? as u64,
786 });
787 }
788
789 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
790
791 yield in_progress_executions;
792 }
793 })
794 }
795
796 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
797 if executions.is_empty() {
798 return Ok(());
799 }
800
801 let mut conn = self.pool.get().await?;
802
803 let tx = conn.transaction().await?;
804
805 let stmt = tx
806 .prepare(
807 r#"--sql
808 UPDATE ora.execution
809 SET
810 executor_id = t.executor_id,
811 started_at = to_timestamp(t.started_at)
812 FROM UNNEST(
813 $1::UUID[],
814 $2::UUID[],
815 $3::DOUBLE PRECISION[]
816 ) AS t(execution_id, executor_id, started_at)
817 WHERE
818 execution_id = id
819 AND ora.execution.started_at IS NULL
820 "#,
821 )
822 .await?;
823
824 let mut col_execution_id = Vec::with_capacity(executions.len());
825 let mut col_executor_id = Vec::with_capacity(executions.len());
826 let mut col_started_at = Vec::with_capacity(executions.len());
827
828 for execution in executions {
829 col_execution_id.push(execution.execution_id.0);
830 col_executor_id.push(execution.executor_id.0);
831 col_started_at.push(
832 execution
833 .started_at
834 .duration_since(UNIX_EPOCH)
835 .unwrap_or_default()
836 .as_secs_f64(),
837 );
838 }
839
840 tx.execute(
841 &stmt,
842 &[&col_execution_id, &col_executor_id, &col_started_at],
843 )
844 .await?;
845
846 tx.commit().await?;
847
848 Ok(())
849 }
850
851 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
852 if executions.is_empty() {
853 return Ok(());
854 }
855
856 let mut conn = self.pool.get().await?;
857
858 let tx = conn.transaction().await?;
859
860 let stmt = tx
861 .prepare(
862 r#"--sql
863 UPDATE ora.execution
864 SET
865 succeeded_at = to_timestamp(t.succeeded_at),
866 output_json = t.output_json
867 FROM UNNEST(
868 $1::UUID[],
869 $2::DOUBLE PRECISION[],
870 $3::TEXT[]
871 ) AS t(execution_id, succeeded_at, output_json)
872 WHERE
873 execution_id = id
874 AND status < 2
875 RETURNING job_id
876 "#,
877 )
878 .await?;
879
880 let mut col_execution_id = Vec::with_capacity(executions.len());
881 let mut col_succeeded_at = Vec::with_capacity(executions.len());
882 let mut col_output_json = Vec::with_capacity(executions.len());
883
884 for execution in executions {
885 col_execution_id.push(execution.execution_id.0);
886 col_succeeded_at.push(
887 execution
888 .succeeded_at
889 .duration_since(UNIX_EPOCH)
890 .unwrap_or_default()
891 .as_secs_f64(),
892 );
893 col_output_json.push(execution.output_json.as_str());
894 }
895
896 let rows = tx
897 .query(
898 &stmt,
899 &[&col_execution_id, &col_succeeded_at, &col_output_json],
900 )
901 .await?;
902
903 let mut job_ids = Vec::with_capacity(rows.len());
904 for row in rows {
905 job_ids.push(JobId(row.try_get(0)?));
906 }
907
908 mark_jobs_inactive(&tx, &job_ids).await?;
909
910 tx.commit().await?;
911
912 Ok(())
913 }
914
915 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
916 if executions.is_empty() {
917 return Ok(());
918 }
919
920 let mut conn = self.pool.get().await?;
921 let tx = conn.transaction().await?;
922
923 let jobs = executions_failed(&tx, executions).await?;
924
925 mark_jobs_inactive(&tx, &jobs).await?;
926
927 tx.commit().await?;
928
929 Ok(())
930 }
931
932 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
933 if executions.is_empty() {
934 return Ok(());
935 }
936
937 let mut conn = self.pool.get().await?;
938 let tx = conn.transaction().await?;
939
940 let job_ids = executions_failed(&tx, executions).await?;
941 add_executions(&tx, &job_ids).await?;
942
943 tx.commit().await?;
944
945 Ok(())
946 }
947
948 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
949 async_stream::try_stream!({
950 let mut last_schedule_id: Option<ScheduleId> = None;
951 loop {
952 let mut conn = self.pool.get().await?;
953 let tx = conn.read_only_transaction().await?;
954
955 let rows = if let Some(last_schedule_id) = last_schedule_id {
956 let stmt = tx
957 .prepare(
958 r#"--sql
959 SELECT
960 id,
961 EXTRACT(EPOCH FROM(
962 SELECT
963 target_execution_time
964 FROM
965 ora.job
966 WHERE
967 ora.job.schedule_id = ora.schedule.id
968 ORDER BY ora.job.id DESC
969 LIMIT 1
970 ))::DOUBLE PRECISION,
971 scheduling_policy_json,
972 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
973 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
974 job_template_json
975 FROM
976 ora.schedule
977 WHERE
978 (
979 ora.schedule.stopped_at IS NULL
980 AND ora.schedule.active_job_id IS NULL
981 )
982 AND id > $1::UUID
983 ORDER BY id ASC
984 LIMIT 1000
985 "#,
986 )
987 .await?;
988
989 tx.query(&stmt, &[&last_schedule_id.0]).await?
990 } else {
991 let stmt = tx
992 .prepare(
993 r#"--sql
994 SELECT
995 id,
996 EXTRACT(EPOCH FROM(
997 SELECT
998 target_execution_time
999 FROM
1000 ora.job
1001 WHERE
1002 ora.job.schedule_id = ora.schedule.id
1003 ORDER BY ora.job.id DESC
1004 LIMIT 1
1005 ))::DOUBLE PRECISION,
1006 scheduling_policy_json,
1007 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1008 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1009 job_template_json
1010 FROM
1011 ora.schedule
1012 WHERE
1013 ora.schedule.stopped_at IS NULL
1014 AND ora.schedule.active_job_id IS NULL
1015 ORDER BY id ASC
1016 LIMIT 1000
1017 "#,
1018 )
1019 .await?;
1020
1021 tx.query(&stmt, &[]).await?
1022 };
1023
1024 tx.commit().await?;
1025
1026 if rows.is_empty() {
1027 break;
1028 }
1029
1030 let mut schedules = Vec::with_capacity(rows.len());
1031
1032 for row in rows {
1033 schedules.push(PendingSchedule {
1034 schedule_id: ScheduleId(row.try_get(0)?),
1035 last_target_execution_time: row
1036 .try_get::<_, Option<f64>>(1)?
1037 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1038 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1039 time_range: TimeRange {
1040 start: row
1041 .try_get::<_, Option<f64>>(3)?
1042 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1043 end: row
1044 .try_get::<_, Option<f64>>(4)?
1045 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1046 },
1047 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1048 });
1049 }
1050
1051 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1052 yield schedules;
1053 }
1054 })
1055 }
1056
1057 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1058 let mut conn = self.pool.get().await?;
1059
1060 loop {
1063 let mut deleted_rows = 0;
1064
1065 {
1066 let tx = conn.transaction().await?;
1067
1068 let stmt = tx
1069 .prepare(
1070 r#"--sql
1071 DELETE FROM ora.job
1072 WHERE id IN (
1073 SELECT ora.job.id
1074 FROM ora.job
1075 JOIN LATERAL (
1076 SELECT
1077 status,
1078 succeeded_at,
1079 failed_at,
1080 cancelled_at
1081 FROM
1082 ora.execution
1083 WHERE
1084 ora.execution.job_id = ora.job.id
1085 ORDER BY ora.execution.id DESC
1086 LIMIT 1
1087 ) e ON TRUE
1088 WHERE
1089 ora.job.inactive
1090 AND (
1091 e.succeeded_at < to_timestamp($1)
1092 OR e.failed_at < to_timestamp($1)
1093 OR e.cancelled_at < to_timestamp($1)
1094 )
1095 LIMIT 25
1096 FOR UPDATE SKIP LOCKED
1097 );
1098 "#,
1099 )
1100 .await?;
1101
1102 let deleted = tx
1103 .execute(
1104 &stmt,
1105 &[&before
1106 .duration_since(UNIX_EPOCH)
1107 .unwrap_or_default()
1108 .as_secs_f64()],
1109 )
1110 .await?;
1111
1112 deleted_rows += deleted;
1113
1114 tx.commit().await?;
1115 }
1116
1117 {
1118 let tx = conn.transaction().await?;
1119 let stmt = tx
1120 .prepare(
1121 r#"--sql
1122 DELETE FROM ONLY ora.schedule
1123 WHERE ctid IN (
1124 SELECT ctid
1125 FROM ora.schedule
1126 WHERE
1127 ora.schedule.stopped_at < to_timestamp($1)
1128 LIMIT 25
1129 FOR UPDATE SKIP LOCKED
1130 )
1131 "#,
1132 )
1133 .await?;
1134
1135 let deleted = tx
1136 .execute(
1137 &stmt,
1138 &[&before
1139 .duration_since(UNIX_EPOCH)
1140 .unwrap_or_default()
1141 .as_secs_f64()],
1142 )
1143 .await?;
1144
1145 tx.commit().await?;
1146
1147 deleted_rows += deleted;
1148 }
1149
1150 {
1151 let tx = conn.transaction().await?;
1152
1153 let stmt = tx
1154 .prepare(
1155 r#"--sql
1156 DELETE FROM ora.job_type
1157 WHERE
1158 NOT EXISTS (
1159 SELECT FROM ora.job
1160 WHERE
1161 ora.job.job_type_id = ora.job_type.id
1162 )
1163 AND NOT EXISTS (
1164 SELECT FROM ora.schedule
1165 WHERE
1166 ora.schedule.job_template_job_type_id = ora.job_type.id
1167 )
1168 "#,
1169 )
1170 .await?;
1171
1172 tx.execute(&stmt, &[]).await?;
1173
1174 tx.commit().await?;
1175 }
1176
1177 if deleted_rows == 0 {
1178 break;
1179 }
1180 }
1181
1182 Ok(())
1183 }
1184
1185 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1186 loop {
1187 let mut conn = self.pool.get().await?;
1188
1189 let has_pending = {
1190 let tx = conn.read_only_transaction().await?;
1191
1192 let stmt = tx
1193 .prepare(
1194 r#"--sql
1195 SELECT 1 FROM ora.schedule
1196 WHERE
1197 ora.schedule.stopped_at IS NULL
1198 AND ora.schedule.active_job_id IS NULL
1199 LIMIT 1
1200 "#,
1201 )
1202 .await?;
1203
1204 let row = tx.query_opt(&stmt, &[]).await?;
1205
1206 tx.commit().await?;
1207
1208 row.is_some()
1209 };
1210
1211 drop(conn);
1212
1213 if has_pending {
1214 break;
1215 }
1216
1217 tokio::time::sleep(Duration::from_millis(500)).await;
1218 }
1219
1220 Ok(())
1221 }
1222}
1223
1224async fn executions_failed(
1225 tx: &DbTransaction<'_>,
1226 executions: &[FailedExecution],
1227) -> Result<Vec<JobId>> {
1228 let stmt = tx
1229 .prepare(
1230 r#"--sql
1231 UPDATE ora.execution
1232 SET
1233 failed_at = to_timestamp(t.failed_at),
1234 failure_reason = t.failure_reason
1235 FROM UNNEST(
1236 $1::UUID[],
1237 $2::DOUBLE PRECISION[],
1238 $3::TEXT[]
1239 ) AS t(execution_id, failed_at, failure_reason)
1240 WHERE
1241 execution_id = id
1242 AND status < 2
1243 RETURNING job_id;
1244 "#,
1245 )
1246 .await?;
1247
1248 let mut col_execution_id = Vec::with_capacity(executions.len());
1249 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1250 let mut col_failure_reason = Vec::with_capacity(executions.len());
1251
1252 for execution in executions {
1253 col_execution_id.push(execution.execution_id.0);
1254 col_succeeded_at.push(
1255 execution
1256 .failed_at
1257 .duration_since(UNIX_EPOCH)
1258 .unwrap_or_default()
1259 .as_secs_f64(),
1260 );
1261 col_failure_reason.push(execution.failure_reason.as_str());
1262 }
1263
1264 let rows = tx
1265 .query(
1266 &stmt,
1267 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1268 )
1269 .await?;
1270
1271 let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1272
1273 Ok(job_ids)
1274}
1275
1276async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1277 if jobs.is_empty() {
1278 return Ok(());
1279 }
1280
1281 let mut col_job_id = Vec::with_capacity(jobs.len());
1282 let mut col_execution_id = Vec::with_capacity(jobs.len());
1283
1284 for job in jobs {
1285 col_job_id.push(job.0);
1286 col_execution_id.push(Uuid::now_v7());
1287 }
1288
1289 let stmt = tx
1290 .prepare(
1291 r#"--sql
1292 INSERT INTO ora.execution (
1293 id,
1294 job_id
1295 ) SELECT * FROM UNNEST(
1296 $1::UUID[],
1297 $2::UUID[]
1298 )
1299 "#,
1300 )
1301 .await?;
1302
1303 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1304
1305 Ok(())
1306}
1307
1308async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1309 if jobs.is_empty() {
1310 return Ok(());
1311 }
1312
1313 let mut col_job_id = Vec::with_capacity(jobs.len());
1314
1315 for job in jobs {
1316 col_job_id.push(job.0);
1317 }
1318
1319 {
1320 let stmt = tx
1321 .prepare(
1322 r#"--sql
1323 UPDATE ora.job
1324 SET inactive = TRUE
1325 WHERE id = ANY($1::UUID[])
1326 "#,
1327 )
1328 .await?;
1329
1330 tx.execute(&stmt, &[&col_job_id]).await?;
1331 }
1332
1333 {
1334 let stmt = tx
1335 .prepare(
1336 r#"--sql
1337 UPDATE ora.schedule
1338 SET
1339 active_job_id = NULL
1340 WHERE active_job_id = ANY($1::UUID[])
1341 "#,
1342 )
1343 .await?;
1344
1345 tx.execute(&stmt, &[&col_job_id]).await?;
1346 }
1347
1348 Ok(())
1349}