1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8 Backend,
9 common::{NextPageToken, TimeRange},
10 executions::{
11 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12 SucceededExecution,
13 },
14 executors::ExecutorId,
15 jobs::{
16 AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17 NewJob,
18 },
19 schedules::{
20 AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21 ScheduleId, ScheduleOrderBy, StoppedSchedule,
22 },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30 db::{DbPool, DbTransaction},
31 query::{
32 jobs::{cancel_jobs, job_count},
33 schedules::schedule_count,
34 },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44 pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49#[derive(Error, Debug)]
51pub enum Error {
52 #[error("{0}")]
53 Migrations(#[from] refinery::Error),
54 #[error("{0}")]
55 Pool(#[from] PoolError),
56 #[error("{0}")]
57 Postgres(#[from] tokio_postgres::Error),
58 #[error("{0}")]
59 Serde(#[from] serde_json::Error),
60 #[error("invalid page token: {0}")]
61 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65 pub async fn new(pool: Pool) -> Result<Self> {
68 let mut conn = pool.get().await?;
69 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70 migrations::runner()
71 .set_migration_table_name("ora.migrations")
72 .run_async(&mut **conn)
73 .await?;
74 Ok(Self { pool: DbPool(pool) })
75 }
76}
77
78impl Backend for PostgresBackend {
79 type Error = Error;
80
81 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82 if job_types.is_empty() {
83 return Ok(());
84 }
85
86 let mut conn = self.pool.get().await?;
87
88 let tx = conn.transaction().await?;
89
90 let mut col_id = Vec::with_capacity(job_types.len());
91 let mut col_description = Vec::with_capacity(job_types.len());
92 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95 for job_type in job_types {
96 col_id.push(job_type.id.as_str());
97 col_description.push(job_type.description.as_deref());
98 col_input_schema_json.push(job_type.input_schema_json.as_deref());
99 col_output_schema_json.push(job_type.output_schema_json.as_deref());
100 }
101
102 {
103 let stmt = tx
104 .prepare(
105 r#"--sql
106 INSERT INTO ora.job_type (
107 id,
108 description,
109 input_schema_json,
110 output_schema_json
111 ) SELECT * FROM UNNEST(
112 $1::TEXT[],
113 $2::TEXT[],
114 $3::TEXT[],
115 $4::TEXT[]
116 ) ON CONFLICT (id) DO UPDATE SET
117 description = EXCLUDED.description,
118 input_schema_json = EXCLUDED.input_schema_json,
119 output_schema_json = EXCLUDED.output_schema_json
120 WHERE
121 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124 "#,
125 )
126 .await?;
127
128 tx.execute(
129 &stmt,
130 &[
131 &col_id,
132 &col_description,
133 &col_input_schema_json,
134 &col_output_schema_json,
135 ],
136 )
137 .await?;
138 }
139
140 tx.commit().await?;
141
142 Ok(())
143 }
144
145 async fn list_job_types(&self) -> Result<Vec<JobType>> {
146 let mut conn = self.pool.get().await?;
147
148 let tx = conn.read_only_transaction().await?;
149
150 let job_types = {
151 let stmt = tx
152 .prepare(
153 r#"--sql
154 SELECT
155 id,
156 description,
157 input_schema_json,
158 output_schema_json
159 FROM
160 ora.job_type
161 ORDER BY id
162 "#,
163 )
164 .await?;
165
166 let rows = tx.query(&stmt, &[]).await?;
167
168 rows.into_iter()
169 .map(|row| {
170 Result::<_>::Ok(JobType {
171 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172 description: row.try_get(1)?,
173 input_schema_json: row.try_get(2)?,
174 output_schema_json: row.try_get(3)?,
175 })
176 })
177 .collect::<Result<Vec<_>>>()?
178 };
179
180 tx.commit().await?;
181
182 Ok(job_types)
183 }
184
185 async fn add_jobs(
186 &self,
187 jobs: &[NewJob],
188 if_not_exists: Option<JobFilters>,
189 ) -> Result<AddedJobs> {
190 if jobs.is_empty() {
191 return Ok(AddedJobs::Added(Vec::new()));
192 }
193
194 let mut conn = self.pool.get().await?;
195
196 let tx = conn.transaction().await?;
197
198 if let Some(filters) = if_not_exists {
199 let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201 if !existing_job_ids.is_empty() {
202 tx.commit().await?;
203 return Ok(AddedJobs::Existing(existing_job_ids));
204 }
205 }
206
207 let mut col_id = Vec::with_capacity(jobs.len());
208 let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210 {
211 let mut col_job_type_id = Vec::with_capacity(jobs.len());
212 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217 for job in jobs {
218 col_id.push(Uuid::now_v7());
219 col_job_type_id.push(job.job.job_type_id.as_str());
220 col_target_execution_time.push(job.job.target_execution_time);
221 col_input_payload_json.push(job.job.input_payload_json.as_str());
222 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224 col_schedule_id.push(job.schedule_id.map(|s| s.0));
225 }
226
227 let stmt = tx
228 .prepare(
229 r#"--sql
230 INSERT INTO ora.job (
231 id,
232 job_type_id,
233 target_execution_time,
234 input_payload_json,
235 timeout_policy_json,
236 retry_policy_json,
237 schedule_id
238 ) SELECT * FROM UNNEST(
239 $1::UUID[],
240 $2::TEXT[],
241 $3::TIMESTAMPTZ[],
242 $4::TEXT[],
243 $5::TEXT[],
244 $6::TEXT[],
245 $7::UUID[]
246 )
247 "#,
248 )
249 .await?;
250
251 tx.execute(
252 &stmt,
253 &[
254 &col_id,
255 &col_job_type_id,
256 &col_target_execution_time,
257 &col_input_payload_json,
258 &col_timeout_policy_json,
259 &col_retry_policy_json,
260 &col_schedule_id,
261 ],
262 )
263 .await?;
264 }
265
266 {
267 let mut col_job_id = Vec::with_capacity(jobs.len());
268 let mut col_job_label_key = Vec::with_capacity(jobs.len());
269 let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271 for (i, job) in jobs.iter().enumerate() {
272 for label in &job.job.labels {
273 col_job_id.push(col_id[i]);
274 col_job_label_key.push(label.key.as_str());
275 col_job_label_value.push(label.value.as_str());
276 }
277 }
278
279 let stmt = tx
280 .prepare(
281 r#"--sql
282 INSERT INTO ora.job_label (
283 job_id,
284 label_key,
285 label_value
286 ) SELECT * FROM UNNEST(
287 $1::UUID[],
288 $2::TEXT[],
289 $3::TEXT[]
290 )
291 "#,
292 )
293 .await?;
294
295 tx.execute(
296 &stmt,
297 &[&col_job_id, &col_job_label_key, &col_job_label_value],
298 )
299 .await?;
300 }
301
302 {
303 let stmt = tx
304 .prepare(
305 r#"--sql
306 UPDATE ora.schedule
307 SET
308 active_job_id = t.job_id
309 FROM UNNEST(
310 $1::UUID[],
311 $2::UUID[]
312 ) AS t(job_id, schedule_id)
313 WHERE
314 ora.schedule.id = t.schedule_id
315 "#,
316 )
317 .await?;
318
319 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320 }
321
322 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323 add_executions(&tx, &job_ids).await?;
324
325 tx.commit().await?;
326
327 Ok(AddedJobs::Added(job_ids))
328 }
329
330 async fn list_jobs(
331 &self,
332 filters: JobFilters,
333 order_by: Option<JobOrderBy>,
334 page_size: u32,
335 page_token: Option<NextPageToken>,
336 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337 let mut conn = self.pool.get().await?;
338 let tx = conn.read_only_transaction().await?;
339
340 let (jobs, next_page_token) = query::jobs::job_details(
341 &tx,
342 filters,
343 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344 page_size,
345 page_token.map(|t| t.0),
346 )
347 .await?;
348
349 tx.commit().await?;
350
351 Ok((jobs, next_page_token))
352 }
353
354 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355 let mut conn = self.pool.get().await?;
356 let tx = conn.read_only_transaction().await?;
357 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358 tx.commit().await?;
359 Ok(count)
360 }
361
362 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363 let mut conn = self.pool.get().await?;
364 let tx = conn.transaction().await?;
365 let jobs = cancel_jobs(&tx, filters).await?;
366
367 let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
368
369 mark_jobs_inactive(&tx, &job_ids).await?;
370
371 tx.commit().await?;
372
373 Ok(jobs)
374 }
375
376 async fn add_schedules(
377 &self,
378 schedules: &[ScheduleDefinition],
379 if_not_exists: Option<ScheduleFilters>,
380 ) -> Result<AddedSchedules> {
381 if schedules.is_empty() {
382 return Ok(AddedSchedules::Added(Vec::new()));
383 }
384
385 let mut conn = self.pool.get().await?;
386 let tx = conn.transaction().await?;
387
388 if let Some(filters) = if_not_exists {
389 let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
390
391 if !existing_schedule_ids.is_empty() {
392 tx.commit().await?;
393 return Ok(AddedSchedules::Existing(existing_schedule_ids));
394 }
395 }
396
397 let mut col_id = Vec::with_capacity(schedules.len());
398 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
399 let mut col_job_template_json = Vec::with_capacity(schedules.len());
400 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
401 let mut col_start_after = Vec::with_capacity(schedules.len());
402 let mut col_end_before = Vec::with_capacity(schedules.len());
403
404 for schedule in schedules {
405 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
406
407 col_id.push(Uuid::now_v7());
408 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
409 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
410 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
411 col_start_after.push(start_after);
412 col_end_before.push(end_before);
413 }
414
415 {
416 let stmt = tx
417 .prepare(
418 r#"--sql
419 INSERT INTO ora.schedule (
420 id,
421 job_template_job_type_id,
422 job_template_json,
423 scheduling_policy_json,
424 start_after,
425 end_before
426 ) SELECT * FROM UNNEST(
427 $1::UUID[],
428 $2::TEXT[],
429 $3::TEXT[],
430 $4::TEXT[],
431 $5::TIMESTAMPTZ[],
432 $6::TIMESTAMPTZ[]
433 )
434 "#,
435 )
436 .await?;
437
438 tx.execute(
439 &stmt,
440 &[
441 &col_id,
442 &col_job_template_job_type_id,
443 &col_job_template_json,
444 &col_scheduling_policy_json,
445 &col_start_after,
446 &col_end_before,
447 ],
448 )
449 .await?;
450 }
451
452 tx.commit().await?;
453
454 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
455
456 Ok(AddedSchedules::Added(schedule_ids))
457 }
458
459 async fn list_schedules(
460 &self,
461 filters: ScheduleFilters,
462 order_by: Option<ScheduleOrderBy>,
463 page_size: u32,
464 page_token: Option<NextPageToken>,
465 ) -> Result<(
466 Vec<ScheduleDetails>,
467 Option<ora_backend::common::NextPageToken>,
468 )> {
469 let mut conn = self.pool.get().await?;
470 let tx = conn.read_only_transaction().await?;
471
472 let (schedules, next_page_token) = query::schedules::schedule_details(
473 &tx,
474 filters,
475 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
476 page_size,
477 page_token.map(|t| t.0),
478 )
479 .await?;
480
481 tx.commit().await?;
482
483 Ok((schedules, next_page_token))
484 }
485
486 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
487 let mut conn = self.pool.get().await?;
488 let tx = conn.read_only_transaction().await?;
489 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
490 tx.commit().await?;
491 Ok(count)
492 }
493
494 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
495 let mut conn = self.pool.get().await?;
496 let tx = conn.transaction().await?;
497 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
498 tx.commit().await?;
499 Ok(schedules)
500 }
501
502 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
503 async_stream::try_stream!({
504 let mut last_execution_id: Option<ExecutionId> = None;
505
506 loop {
507 let mut conn = self.pool.get().await?;
508 let tx = conn.read_only_transaction().await?;
509
510 let rows = if let Some(last_execution_id) = last_execution_id {
511 let stmt = tx
512 .prepare(
513 r#"--sql
514 SELECT
515 ora.execution.id,
516 ora.job.id,
517 ora.job.job_type_id,
518 ora.job.input_payload_json,
519 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
520 ora.job.retry_policy_json,
521 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
522 FROM
523 ora.execution
524 JOIN ora.job ON
525 ora.execution.job_id = ora.job.id
526 WHERE
527 status = 0
528 AND ora.job.target_execution_time <= NOW()
529 AND ora.execution.id > $1::UUID
530 ORDER BY
531 ora.execution.id ASC
532 LIMIT 1000
533 "#,
534 )
535 .await?;
536
537 tx.query(&stmt, &[&last_execution_id.0]).await?
538 } else {
539 let stmt = tx
540 .prepare(
541 r#"--sql
542 SELECT
543 ora.execution.id,
544 ora.job.id,
545 ora.job.job_type_id,
546 ora.job.input_payload_json,
547 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
548 ora.job.retry_policy_json,
549 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
550 FROM
551 ora.execution
552 JOIN ora.job ON
553 ora.execution.job_id = ora.job.id
554 WHERE
555 status = 0
556 AND ora.job.target_execution_time <= NOW()
557 ORDER BY
558 ora.execution.id ASC
559 LIMIT 1000
560 "#,
561 )
562 .await?;
563
564 tx.query(&stmt, &[]).await?
565 };
566
567 tx.commit().await?;
568
569 if rows.is_empty() {
570 break;
571 }
572
573 let mut ready_executions = Vec::with_capacity(rows.len());
574
575 for row in rows {
576 ready_executions.push(ReadyExecution {
577 execution_id: ExecutionId(row.try_get(0)?),
578 job_id: JobId(row.try_get(1)?),
579 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
580 input_payload_json: row.try_get(3)?,
581 attempt_number: row.try_get::<_, i64>(4)? as u64,
582 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
583 target_execution_time: UNIX_EPOCH
584 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
585 });
586 }
587
588 last_execution_id = ready_executions.last().map(|e| e.execution_id);
589
590 yield ready_executions;
591 }
592 })
593 }
594
595 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
596 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
597
598 loop {
599 let mut conn = self.pool.get().await?;
600
601 let next_timestamp = {
602 let tx = conn.read_only_transaction().await?;
603
604 let stmt = tx
605 .prepare(
606 r#"--sql
607 SELECT
608 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
609 FROM
610 ora.job
611 JOIN ora.execution ON
612 ora.execution.job_id = ora.job.id
613 WHERE
614 ora.execution.status = 0
615 AND NOT (ora.execution.id = ANY($1::UUID[]))
616 ORDER BY
617 ora.job.target_execution_time ASC
618 LIMIT 1
619 "#,
620 )
621 .await?;
622
623 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
624
625 tx.commit().await?;
626
627 match row {
628 Some(row) => row
629 .try_get::<_, Option<f64>>(0)?
630 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
631 None => None,
632 }
633 };
634
635 drop(conn);
636
637 if let Some(next_timestamp) = next_timestamp {
638 let now = std::time::SystemTime::now();
639
640 if next_timestamp <= now {
641 break;
642 }
643
644 tokio::time::sleep(
645 next_timestamp
646 .duration_since(now)
647 .unwrap_or_else(|_| Duration::from_secs(0)),
648 )
649 .await;
650 break;
651 }
652
653 tokio::time::sleep(Duration::from_millis(500)).await;
654 }
655
656 Ok(())
657 }
658
659 fn in_progress_executions(
660 &self,
661 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
662 async_stream::try_stream!({
663 let mut last_execution_id: Option<ExecutionId> = None;
664
665 loop {
666 let mut conn = self.pool.get().await?;
667 let tx = conn.read_only_transaction().await?;
668
669 let rows = if let Some(last_execution_id) = last_execution_id {
670 let stmt = tx
671 .prepare(
672 r#"--sql
673 SELECT
674 ora.execution.id,
675 ora.job.id,
676 ora.execution.executor_id,
677 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
678 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
679 ora.job.timeout_policy_json,
680 ora.job.retry_policy_json,
681 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
682 FROM
683 ora.execution
684 JOIN ora.job ON
685 ora.execution.job_id = ora.job.id
686 WHERE
687 status = 1
688 AND ora.execution.id > $1::UUID
689 ORDER BY
690 ora.execution.id ASC
691 LIMIT 1000
692 "#,
693 )
694 .await?;
695
696 tx.query(&stmt, &[&last_execution_id.0]).await?
697 } else {
698 let stmt = tx
699 .prepare(
700 r#"--sql
701 SELECT
702 ora.execution.id,
703 ora.job.id,
704 ora.execution.executor_id,
705 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
706 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
707 ora.job.timeout_policy_json,
708 ora.job.retry_policy_json,
709 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
710 FROM
711 ora.execution
712 JOIN ora.job ON
713 ora.execution.job_id = ora.job.id
714 WHERE
715 status = 1
716 ORDER BY
717 ora.execution.id ASC
718 LIMIT 1000
719 "#,
720 )
721 .await?;
722
723 tx.query(&stmt, &[]).await?
724 };
725
726 tx.commit().await?;
727
728 if rows.is_empty() {
729 break;
730 }
731
732 let mut in_progress_executions = Vec::with_capacity(rows.len());
733
734 for row in rows {
735 in_progress_executions.push(InProgressExecution {
736 execution_id: ExecutionId(row.try_get(0)?),
737 job_id: JobId(row.try_get(1)?),
738 executor_id: ExecutorId(row.try_get(2)?),
739 target_execution_time: UNIX_EPOCH
740 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
741 started_at: UNIX_EPOCH
742 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
743 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
744 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
745 attempt_number: row.try_get::<_, i64>(7)? as u64,
746 });
747 }
748
749 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
750
751 yield in_progress_executions;
752 }
753 })
754 }
755
756 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
757 if executions.is_empty() {
758 return Ok(());
759 }
760
761 let mut conn = self.pool.get().await?;
762
763 let tx = conn.transaction().await?;
764
765 let stmt = tx
766 .prepare(
767 r#"--sql
768 UPDATE ora.execution
769 SET
770 executor_id = t.executor_id,
771 started_at = to_timestamp(t.started_at)
772 FROM UNNEST(
773 $1::UUID[],
774 $2::UUID[],
775 $3::DOUBLE PRECISION[]
776 ) AS t(execution_id, executor_id, started_at)
777 WHERE
778 execution_id = id
779 AND ora.execution.started_at IS NULL
780 "#,
781 )
782 .await?;
783
784 let mut col_execution_id = Vec::with_capacity(executions.len());
785 let mut col_executor_id = Vec::with_capacity(executions.len());
786 let mut col_started_at = Vec::with_capacity(executions.len());
787
788 for execution in executions {
789 col_execution_id.push(execution.execution_id.0);
790 col_executor_id.push(execution.executor_id.0);
791 col_started_at.push(
792 execution
793 .started_at
794 .duration_since(UNIX_EPOCH)
795 .unwrap_or_default()
796 .as_secs_f64(),
797 );
798 }
799
800 tx.execute(
801 &stmt,
802 &[&col_execution_id, &col_executor_id, &col_started_at],
803 )
804 .await?;
805
806 tx.commit().await?;
807
808 Ok(())
809 }
810
811 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
812 if executions.is_empty() {
813 return Ok(());
814 }
815
816 let mut conn = self.pool.get().await?;
817
818 let tx = conn.transaction().await?;
819
820 let stmt = tx
821 .prepare(
822 r#"--sql
823 UPDATE ora.execution
824 SET
825 succeeded_at = to_timestamp(t.succeeded_at),
826 output_json = t.output_json
827 FROM UNNEST(
828 $1::UUID[],
829 $2::DOUBLE PRECISION[],
830 $3::TEXT[]
831 ) AS t(execution_id, succeeded_at, output_json)
832 WHERE
833 execution_id = id
834 AND status < 2
835 RETURNING job_id
836 "#,
837 )
838 .await?;
839
840 let mut col_execution_id = Vec::with_capacity(executions.len());
841 let mut col_succeeded_at = Vec::with_capacity(executions.len());
842 let mut col_output_json = Vec::with_capacity(executions.len());
843
844 for execution in executions {
845 col_execution_id.push(execution.execution_id.0);
846 col_succeeded_at.push(
847 execution
848 .succeeded_at
849 .duration_since(UNIX_EPOCH)
850 .unwrap_or_default()
851 .as_secs_f64(),
852 );
853 col_output_json.push(execution.output_json.as_str());
854 }
855
856 let rows = tx
857 .query(
858 &stmt,
859 &[&col_execution_id, &col_succeeded_at, &col_output_json],
860 )
861 .await?;
862
863 let mut job_ids = Vec::with_capacity(rows.len());
864 for row in rows {
865 job_ids.push(JobId(row.try_get(0)?));
866 }
867
868 mark_jobs_inactive(&tx, &job_ids).await?;
869
870 tx.commit().await?;
871
872 Ok(())
873 }
874
875 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
876 if executions.is_empty() {
877 return Ok(());
878 }
879
880 let mut conn = self.pool.get().await?;
881 let tx = conn.transaction().await?;
882
883 let jobs = executions_failed(&tx, executions).await?;
884
885 mark_jobs_inactive(&tx, &jobs).await?;
886
887 tx.commit().await?;
888
889 Ok(())
890 }
891
892 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
893 if executions.is_empty() {
894 return Ok(());
895 }
896
897 let mut conn = self.pool.get().await?;
898 let tx = conn.transaction().await?;
899
900 let job_ids = executions_failed(&tx, executions).await?;
901 add_executions(&tx, &job_ids).await?;
902
903 tx.commit().await?;
904
905 Ok(())
906 }
907
908 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
909 async_stream::try_stream!({
910 let mut last_schedule_id: Option<ScheduleId> = None;
911 loop {
912 let mut conn = self.pool.get().await?;
913 let tx = conn.read_only_transaction().await?;
914
915 let rows = if let Some(last_schedule_id) = last_schedule_id {
916 let stmt = tx
917 .prepare(
918 r#"--sql
919 SELECT
920 id,
921 EXTRACT(EPOCH FROM(
922 SELECT
923 target_execution_time
924 FROM
925 ora.job
926 WHERE
927 ora.job.schedule_id = ora.schedule.id
928 ORDER BY ora.job.id DESC
929 LIMIT 1
930 ))::DOUBLE PRECISION,
931 scheduling_policy_json,
932 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
933 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
934 job_template_json
935 FROM
936 ora.schedule
937 WHERE
938 (
939 ora.schedule.stopped_at IS NULL
940 AND ora.schedule.active_job_id IS NULL
941 )
942 AND id > $1::UUID
943 ORDER BY id ASC
944 LIMIT 1000
945 "#,
946 )
947 .await?;
948
949 tx.query(&stmt, &[&last_schedule_id.0]).await?
950 } else {
951 let stmt = tx
952 .prepare(
953 r#"--sql
954 SELECT
955 id,
956 EXTRACT(EPOCH FROM(
957 SELECT
958 target_execution_time
959 FROM
960 ora.job
961 WHERE
962 ora.job.schedule_id = ora.schedule.id
963 ORDER BY ora.job.id DESC
964 LIMIT 1
965 ))::DOUBLE PRECISION,
966 scheduling_policy_json,
967 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
968 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
969 job_template_json
970 FROM
971 ora.schedule
972 WHERE
973 ora.schedule.stopped_at IS NULL
974 AND ora.schedule.active_job_id IS NULL
975 ORDER BY id ASC
976 LIMIT 1000
977 "#,
978 )
979 .await?;
980
981 tx.query(&stmt, &[]).await?
982 };
983
984 tx.commit().await?;
985
986 if rows.is_empty() {
987 break;
988 }
989
990 let mut schedules = Vec::with_capacity(rows.len());
991
992 for row in rows {
993 schedules.push(PendingSchedule {
994 schedule_id: ScheduleId(row.try_get(0)?),
995 last_target_execution_time: row
996 .try_get::<_, Option<f64>>(1)?
997 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
998 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
999 time_range: TimeRange {
1000 start: row
1001 .try_get::<_, Option<f64>>(3)?
1002 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1003 end: row
1004 .try_get::<_, Option<f64>>(4)?
1005 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1006 },
1007 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1008 });
1009 }
1010
1011 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1012 yield schedules;
1013 }
1014 })
1015 }
1016
1017 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1018 let mut conn = self.pool.get().await?;
1019
1020 loop {
1023 let mut deleted_rows = 0;
1024
1025 {
1026 let tx = conn.transaction().await?;
1027
1028 let stmt = tx
1029 .prepare(
1030 r#"--sql
1031 DELETE FROM ora.job
1032 WHERE id IN (
1033 SELECT ora.job.id
1034 FROM ora.job
1035 JOIN LATERAL (
1036 SELECT
1037 status,
1038 succeeded_at,
1039 failed_at,
1040 cancelled_at
1041 FROM
1042 ora.execution
1043 WHERE
1044 ora.execution.job_id = ora.job.id
1045 ORDER BY ora.execution.id DESC
1046 LIMIT 1
1047 ) e ON TRUE
1048 WHERE
1049 ora.job.inactive
1050 AND (
1051 e.succeeded_at < to_timestamp($1)
1052 OR e.failed_at < to_timestamp($1)
1053 OR e.cancelled_at < to_timestamp($1)
1054 )
1055 LIMIT 25
1056 FOR UPDATE SKIP LOCKED
1057 );
1058 "#,
1059 )
1060 .await?;
1061
1062 let deleted = tx
1063 .execute(
1064 &stmt,
1065 &[&before
1066 .duration_since(UNIX_EPOCH)
1067 .unwrap_or_default()
1068 .as_secs_f64()],
1069 )
1070 .await?;
1071
1072 deleted_rows += deleted;
1073
1074 tx.commit().await?;
1075 }
1076
1077 {
1078 let tx = conn.transaction().await?;
1079 let stmt = tx
1080 .prepare(
1081 r#"--sql
1082 DELETE FROM ONLY ora.schedule
1083 WHERE ctid IN (
1084 SELECT ctid
1085 FROM ora.schedule
1086 WHERE
1087 ora.schedule.stopped_at < to_timestamp($1)
1088 LIMIT 25
1089 FOR UPDATE SKIP LOCKED
1090 )
1091 "#,
1092 )
1093 .await?;
1094
1095 let deleted = tx
1096 .execute(
1097 &stmt,
1098 &[&before
1099 .duration_since(UNIX_EPOCH)
1100 .unwrap_or_default()
1101 .as_secs_f64()],
1102 )
1103 .await?;
1104
1105 tx.commit().await?;
1106
1107 deleted_rows += deleted;
1108 }
1109
1110 {
1111 let tx = conn.transaction().await?;
1112
1113 let stmt = tx
1114 .prepare(
1115 r#"--sql
1116 DELETE FROM ora.job_type
1117 WHERE
1118 NOT EXISTS (
1119 SELECT FROM ora.job
1120 WHERE
1121 ora.job.job_type_id = ora.job_type.id
1122 )
1123 AND NOT EXISTS (
1124 SELECT FROM ora.schedule
1125 WHERE
1126 ora.schedule.job_template_job_type_id = ora.job_type.id
1127 )
1128 "#,
1129 )
1130 .await?;
1131
1132 tx.execute(&stmt, &[]).await?;
1133
1134 tx.commit().await?;
1135 }
1136
1137 if deleted_rows == 0 {
1138 break;
1139 }
1140 }
1141
1142 Ok(())
1143 }
1144
1145 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1146 loop {
1147 let mut conn = self.pool.get().await?;
1148
1149 let has_pending = {
1150 let tx = conn.read_only_transaction().await?;
1151
1152 let stmt = tx
1153 .prepare(
1154 r#"--sql
1155 SELECT 1 FROM ora.schedule
1156 WHERE
1157 ora.schedule.stopped_at IS NULL
1158 AND ora.schedule.active_job_id IS NULL
1159 LIMIT 1
1160 "#,
1161 )
1162 .await?;
1163
1164 let row = tx.query_opt(&stmt, &[]).await?;
1165
1166 tx.commit().await?;
1167
1168 row.is_some()
1169 };
1170
1171 drop(conn);
1172
1173 if has_pending {
1174 break;
1175 }
1176
1177 tokio::time::sleep(Duration::from_millis(500)).await;
1178 }
1179
1180 Ok(())
1181 }
1182}
1183
1184async fn executions_failed(
1185 tx: &DbTransaction<'_>,
1186 executions: &[FailedExecution],
1187) -> Result<Vec<JobId>> {
1188 let stmt = tx
1189 .prepare(
1190 r#"--sql
1191 UPDATE ora.execution
1192 SET
1193 failed_at = to_timestamp(t.failed_at),
1194 failure_reason = t.failure_reason
1195 FROM UNNEST(
1196 $1::UUID[],
1197 $2::DOUBLE PRECISION[],
1198 $3::TEXT[]
1199 ) AS t(execution_id, failed_at, failure_reason)
1200 WHERE
1201 execution_id = id
1202 AND status < 2
1203 RETURNING job_id;
1204 "#,
1205 )
1206 .await?;
1207
1208 let mut col_execution_id = Vec::with_capacity(executions.len());
1209 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1210 let mut col_failure_reason = Vec::with_capacity(executions.len());
1211
1212 for execution in executions {
1213 col_execution_id.push(execution.execution_id.0);
1214 col_succeeded_at.push(
1215 execution
1216 .failed_at
1217 .duration_since(UNIX_EPOCH)
1218 .unwrap_or_default()
1219 .as_secs_f64(),
1220 );
1221 col_failure_reason.push(execution.failure_reason.as_str());
1222 }
1223
1224 let rows = tx
1225 .query(
1226 &stmt,
1227 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1228 )
1229 .await?;
1230
1231 let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1232
1233 Ok(job_ids)
1234}
1235
1236async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1237 if jobs.is_empty() {
1238 return Ok(());
1239 }
1240
1241 let mut col_job_id = Vec::with_capacity(jobs.len());
1242 let mut col_execution_id = Vec::with_capacity(jobs.len());
1243
1244 for job in jobs {
1245 col_job_id.push(job.0);
1246 col_execution_id.push(Uuid::now_v7());
1247 }
1248
1249 let stmt = tx
1250 .prepare(
1251 r#"--sql
1252 INSERT INTO ora.execution (
1253 id,
1254 job_id
1255 ) SELECT * FROM UNNEST(
1256 $1::UUID[],
1257 $2::UUID[]
1258 )
1259 "#,
1260 )
1261 .await?;
1262
1263 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1264
1265 Ok(())
1266}
1267
1268async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1269 if jobs.is_empty() {
1270 return Ok(());
1271 }
1272
1273 let mut col_job_id = Vec::with_capacity(jobs.len());
1274
1275 for job in jobs {
1276 col_job_id.push(job.0);
1277 }
1278
1279 {
1280 let stmt = tx
1281 .prepare(
1282 r#"--sql
1283 UPDATE ora.job
1284 SET inactive = TRUE
1285 WHERE id = ANY($1::UUID[])
1286 "#,
1287 )
1288 .await?;
1289
1290 tx.execute(&stmt, &[&col_job_id]).await?;
1291 }
1292
1293 {
1294 let stmt = tx
1295 .prepare(
1296 r#"--sql
1297 UPDATE ora.schedule
1298 SET
1299 active_job_id = NULL
1300 WHERE active_job_id = ANY($1::UUID[])
1301 "#,
1302 )
1303 .await?;
1304
1305 tx.execute(&stmt, &[&col_job_id]).await?;
1306 }
1307
1308 Ok(())
1309}