1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8 Backend,
9 common::{NextPageToken, TimeRange},
10 executions::{
11 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12 SucceededExecution,
13 },
14 executors::ExecutorId,
15 jobs::{CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId, NewJob},
16 schedules::{
17 PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters, ScheduleId,
18 ScheduleOrderBy, StoppedSchedule,
19 },
20};
21
22use refinery::embed_migrations;
23use thiserror::Error;
24use uuid::Uuid;
25
26use crate::{
27 db::{DbPool, DbTransaction},
28 query::{
29 jobs::{cancel_jobs, job_count, job_exists},
30 schedules::{schedule_count, schedule_exists},
31 },
32};
33
34embed_migrations!("./migrations");
35
36mod db;
37mod models;
38mod query;
39
40pub struct PostgresBackend {
41 pool: DbPool,
42}
43
44type Result<T> = core::result::Result<T, Error>;
45
46#[derive(Error, Debug)]
48pub enum Error {
49 #[error("{0}")]
50 Migrations(#[from] refinery::Error),
51 #[error("{0}")]
52 Pool(#[from] PoolError),
53 #[error("{0}")]
54 Postgres(#[from] tokio_postgres::Error),
55 #[error("{0}")]
56 Serde(#[from] serde_json::Error),
57 #[error("invalid page token: {0}")]
58 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
59}
60
61impl PostgresBackend {
62 pub async fn new(pool: Pool) -> Result<Self> {
65 let mut conn = pool.get().await?;
66 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
67 migrations::runner()
68 .set_migration_table_name("ora.migrations")
69 .run_async(&mut **conn)
70 .await?;
71 Ok(Self { pool: DbPool(pool) })
72 }
73}
74
75impl Backend for PostgresBackend {
76 type Error = Error;
77
78 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
79 if job_types.is_empty() {
80 return Ok(());
81 }
82
83 let mut conn = self.pool.get().await?;
84
85 let tx = conn.transaction().await?;
86
87 let mut col_id = Vec::with_capacity(job_types.len());
88 let mut col_description = Vec::with_capacity(job_types.len());
89 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
90 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
91
92 for job_type in job_types {
93 col_id.push(job_type.id.as_str());
94 col_description.push(job_type.description.as_deref());
95 col_input_schema_json.push(job_type.input_schema_json.as_deref());
96 col_output_schema_json.push(job_type.output_schema_json.as_deref());
97 }
98
99 {
100 let stmt = tx
101 .prepare(
102 r#"--sql
103 INSERT INTO ora.job_type (
104 id,
105 description,
106 input_schema_json,
107 output_schema_json
108 ) SELECT * FROM UNNEST(
109 $1::TEXT[],
110 $2::TEXT[],
111 $3::TEXT[],
112 $4::TEXT[]
113 ) ON CONFLICT (id) DO UPDATE SET
114 description = EXCLUDED.description,
115 input_schema_json = EXCLUDED.input_schema_json,
116 output_schema_json = EXCLUDED.output_schema_json
117 WHERE
118 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
119 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
120 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
121 "#,
122 )
123 .await?;
124
125 tx.execute(
126 &stmt,
127 &[
128 &col_id,
129 &col_description,
130 &col_input_schema_json,
131 &col_output_schema_json,
132 ],
133 )
134 .await?;
135 }
136
137 tx.commit().await?;
138
139 Ok(())
140 }
141
142 async fn list_job_types(&self) -> Result<Vec<JobType>> {
143 let mut conn = self.pool.get().await?;
144
145 let tx = conn.read_only_transaction().await?;
146
147 let job_types = {
148 let stmt = tx
149 .prepare(
150 r#"--sql
151 SELECT
152 id,
153 description,
154 input_schema_json,
155 output_schema_json
156 FROM
157 ora.job_type
158 ORDER BY id
159 "#,
160 )
161 .await?;
162
163 let rows = tx.query(&stmt, &[]).await?;
164
165 rows.into_iter()
166 .map(|row| {
167 Result::<_>::Ok(JobType {
168 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
169 description: row.try_get(1)?,
170 input_schema_json: row.try_get(2)?,
171 output_schema_json: row.try_get(3)?,
172 })
173 })
174 .collect::<Result<Vec<_>>>()?
175 };
176
177 tx.commit().await?;
178
179 Ok(job_types)
180 }
181
182 async fn add_jobs(
183 &self,
184 jobs: &[NewJob],
185 if_not_exists: Option<JobFilters>,
186 ) -> Result<Vec<JobId>> {
187 if jobs.is_empty() {
188 return Ok(Vec::new());
189 }
190
191 let mut conn = self.pool.get().await?;
192
193 let tx = conn.transaction().await?;
194
195 if let Some(filters) = if_not_exists
196 && job_exists(&tx, filters).await?
197 {
198 tx.commit().await?;
199 return Ok(Vec::new());
200 }
201
202 let mut col_id = Vec::with_capacity(jobs.len());
203 let mut col_schedule_id = Vec::with_capacity(jobs.len());
204
205 {
206 let mut col_job_type_id = Vec::with_capacity(jobs.len());
207 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
208 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
209 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
210 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
211
212 for job in jobs {
213 col_id.push(Uuid::now_v7());
214 col_job_type_id.push(job.job.job_type_id.as_str());
215 col_target_execution_time.push(job.job.target_execution_time);
216 col_input_payload_json.push(job.job.input_payload_json.as_str());
217 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
218 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
219 col_schedule_id.push(job.schedule_id.map(|s| s.0));
220 }
221
222 let stmt = tx
223 .prepare(
224 r#"--sql
225 INSERT INTO ora.job (
226 id,
227 job_type_id,
228 target_execution_time,
229 input_payload_json,
230 timeout_policy_json,
231 retry_policy_json,
232 schedule_id
233 ) SELECT * FROM UNNEST(
234 $1::UUID[],
235 $2::TEXT[],
236 $3::TIMESTAMPTZ[],
237 $4::TEXT[],
238 $5::TEXT[],
239 $6::TEXT[],
240 $7::UUID[]
241 )
242 "#,
243 )
244 .await?;
245
246 tx.execute(
247 &stmt,
248 &[
249 &col_id,
250 &col_job_type_id,
251 &col_target_execution_time,
252 &col_input_payload_json,
253 &col_timeout_policy_json,
254 &col_retry_policy_json,
255 &col_schedule_id,
256 ],
257 )
258 .await?;
259 }
260
261 {
262 let mut col_job_id = Vec::with_capacity(jobs.len());
263 let mut col_job_label_key = Vec::with_capacity(jobs.len());
264 let mut col_job_label_value = Vec::with_capacity(jobs.len());
265
266 for (i, job) in jobs.iter().enumerate() {
267 for label in &job.job.labels {
268 col_job_id.push(col_id[i]);
269 col_job_label_key.push(label.key.as_str());
270 col_job_label_value.push(label.value.as_str());
271 }
272 }
273
274 let stmt = tx
275 .prepare(
276 r#"--sql
277 INSERT INTO ora.job_label (
278 job_id,
279 label_key,
280 label_value
281 ) SELECT * FROM UNNEST(
282 $1::UUID[],
283 $2::TEXT[],
284 $3::TEXT[]
285 )
286 "#,
287 )
288 .await?;
289
290 tx.execute(
291 &stmt,
292 &[&col_job_id, &col_job_label_key, &col_job_label_value],
293 )
294 .await?;
295 }
296
297 {
298 let stmt = tx
299 .prepare(
300 r#"--sql
301 UPDATE ora.schedule
302 SET
303 active_job_id = t.job_id
304 FROM UNNEST(
305 $1::UUID[],
306 $2::UUID[]
307 ) AS t(job_id, schedule_id)
308 WHERE
309 ora.schedule.id = t.schedule_id
310 "#,
311 )
312 .await?;
313
314 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
315 }
316
317 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
318 add_executions(&tx, &job_ids).await?;
319
320 tx.commit().await?;
321
322 Ok(job_ids)
323 }
324
325 async fn list_jobs(
326 &self,
327 filters: JobFilters,
328 order_by: Option<JobOrderBy>,
329 page_size: u32,
330 page_token: Option<NextPageToken>,
331 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
332 let mut conn = self.pool.get().await?;
333 let tx = conn.read_only_transaction().await?;
334
335 let (jobs, next_page_token) = query::jobs::job_details(
336 &tx,
337 filters,
338 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
339 page_size,
340 page_token.map(|t| t.0),
341 )
342 .await?;
343
344 tx.commit().await?;
345
346 Ok((jobs, next_page_token))
347 }
348
349 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
350 let mut conn = self.pool.get().await?;
351 let tx = conn.read_only_transaction().await?;
352 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
353 tx.commit().await?;
354 Ok(count)
355 }
356
357 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
358 let mut conn = self.pool.get().await?;
359 let tx = conn.transaction().await?;
360 let jobs = cancel_jobs(&tx, filters).await?;
361
362 let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
363
364 mark_jobs_inactive(&tx, &job_ids).await?;
365
366 tx.commit().await?;
367
368 Ok(jobs)
369 }
370
371 async fn add_schedules(
372 &self,
373 schedules: &[ScheduleDefinition],
374 if_not_exists: Option<ScheduleFilters>,
375 ) -> Result<Vec<ScheduleId>> {
376 if schedules.is_empty() {
377 return Ok(Vec::new());
378 }
379
380 let mut conn = self.pool.get().await?;
381 let tx = conn.transaction().await?;
382
383 if let Some(filters) = if_not_exists
384 && schedule_exists(&tx, filters).await?
385 {
386 tx.commit().await?;
387 return Ok(Vec::new());
388 }
389
390 let mut col_id = Vec::with_capacity(schedules.len());
391 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
392 let mut col_job_template_json = Vec::with_capacity(schedules.len());
393 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
394 let mut col_start_after = Vec::with_capacity(schedules.len());
395 let mut col_end_before = Vec::with_capacity(schedules.len());
396
397 for schedule in schedules {
398 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
399
400 col_id.push(Uuid::now_v7());
401 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
402 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
403 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
404 col_start_after.push(start_after);
405 col_end_before.push(end_before);
406 }
407
408 {
409 let stmt = tx
410 .prepare(
411 r#"--sql
412 INSERT INTO ora.schedule (
413 id,
414 job_template_job_type_id,
415 job_template_json,
416 scheduling_policy_json,
417 start_after,
418 end_before
419 ) SELECT * FROM UNNEST(
420 $1::UUID[],
421 $2::TEXT[],
422 $3::TEXT[],
423 $4::TEXT[],
424 $5::TIMESTAMPTZ[],
425 $6::TIMESTAMPTZ[]
426 )
427 "#,
428 )
429 .await?;
430
431 tx.execute(
432 &stmt,
433 &[
434 &col_id,
435 &col_job_template_job_type_id,
436 &col_job_template_json,
437 &col_scheduling_policy_json,
438 &col_start_after,
439 &col_end_before,
440 ],
441 )
442 .await?;
443 }
444
445 tx.commit().await?;
446
447 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
448
449 Ok(schedule_ids)
450 }
451
452 async fn list_schedules(
453 &self,
454 filters: ScheduleFilters,
455 order_by: Option<ScheduleOrderBy>,
456 page_size: u32,
457 page_token: Option<NextPageToken>,
458 ) -> Result<(
459 Vec<ScheduleDetails>,
460 Option<ora_backend::common::NextPageToken>,
461 )> {
462 let mut conn = self.pool.get().await?;
463 let tx = conn.read_only_transaction().await?;
464
465 let (schedules, next_page_token) = query::schedules::schedule_details(
466 &tx,
467 filters,
468 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
469 page_size,
470 page_token.map(|t| t.0),
471 )
472 .await?;
473
474 tx.commit().await?;
475
476 Ok((schedules, next_page_token))
477 }
478
479 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
480 let mut conn = self.pool.get().await?;
481 let tx = conn.read_only_transaction().await?;
482 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
483 tx.commit().await?;
484 Ok(count)
485 }
486
487 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
488 let mut conn = self.pool.get().await?;
489 let tx = conn.transaction().await?;
490 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
491 tx.commit().await?;
492 Ok(schedules)
493 }
494
495 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
496 async_stream::try_stream!({
497 let mut last_execution_id: Option<ExecutionId> = None;
498
499 loop {
500 let mut conn = self.pool.get().await?;
501 let tx = conn.read_only_transaction().await?;
502
503 let rows = if let Some(last_execution_id) = last_execution_id {
504 let stmt = tx
505 .prepare(
506 r#"--sql
507 SELECT
508 ora.execution.id,
509 ora.job.id,
510 ora.job.job_type_id,
511 ora.job.input_payload_json,
512 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
513 ora.job.retry_policy_json,
514 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
515 FROM
516 ora.execution
517 JOIN ora.job ON
518 ora.execution.job_id = ora.job.id
519 WHERE
520 status = 0
521 AND ora.job.target_execution_time <= NOW()
522 AND ora.execution.id > $1::UUID
523 ORDER BY
524 ora.execution.id ASC
525 LIMIT 1000
526 "#,
527 )
528 .await?;
529
530 tx.query(&stmt, &[&last_execution_id.0]).await?
531 } else {
532 let stmt = tx
533 .prepare(
534 r#"--sql
535 SELECT
536 ora.execution.id,
537 ora.job.id,
538 ora.job.job_type_id,
539 ora.job.input_payload_json,
540 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
541 ora.job.retry_policy_json,
542 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
543 FROM
544 ora.execution
545 JOIN ora.job ON
546 ora.execution.job_id = ora.job.id
547 WHERE
548 status = 0
549 AND ora.job.target_execution_time <= NOW()
550 ORDER BY
551 ora.execution.id ASC
552 LIMIT 1000
553 "#,
554 )
555 .await?;
556
557 tx.query(&stmt, &[]).await?
558 };
559
560 tx.commit().await?;
561
562 if rows.is_empty() {
563 break;
564 }
565
566 let mut ready_executions = Vec::with_capacity(rows.len());
567
568 for row in rows {
569 ready_executions.push(ReadyExecution {
570 execution_id: ExecutionId(row.try_get(0)?),
571 job_id: JobId(row.try_get(1)?),
572 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
573 input_payload_json: row.try_get(3)?,
574 attempt_number: row.try_get::<_, i64>(4)? as u64,
575 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
576 target_execution_time: UNIX_EPOCH
577 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
578 });
579 }
580
581 last_execution_id = ready_executions.last().map(|e| e.execution_id);
582
583 yield ready_executions;
584 }
585 })
586 }
587
588 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
589 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
590
591 loop {
592 let mut conn = self.pool.get().await?;
593
594 let next_timestamp = {
595 let tx = conn.read_only_transaction().await?;
596
597 let stmt = tx
598 .prepare(
599 r#"--sql
600 SELECT
601 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
602 FROM
603 ora.job
604 JOIN ora.execution ON
605 ora.execution.job_id = ora.job.id
606 WHERE
607 ora.execution.status = 0
608 AND NOT (ora.execution.id = ANY($1::UUID[]))
609 ORDER BY
610 ora.job.target_execution_time ASC
611 LIMIT 1
612 "#,
613 )
614 .await?;
615
616 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
617
618 tx.commit().await?;
619
620 match row {
621 Some(row) => row
622 .try_get::<_, Option<f64>>(0)?
623 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
624 None => None,
625 }
626 };
627
628 drop(conn);
629
630 if let Some(next_timestamp) = next_timestamp {
631 let now = std::time::SystemTime::now();
632
633 if next_timestamp <= now {
634 break;
635 }
636
637 tokio::time::sleep(
638 next_timestamp
639 .duration_since(now)
640 .unwrap_or_else(|_| Duration::from_secs(0)),
641 )
642 .await;
643 break;
644 }
645
646 tokio::time::sleep(Duration::from_millis(500)).await;
647 }
648
649 Ok(())
650 }
651
652 fn in_progress_executions(
653 &self,
654 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
655 async_stream::try_stream!({
656 let mut last_execution_id: Option<ExecutionId> = None;
657
658 loop {
659 let mut conn = self.pool.get().await?;
660 let tx = conn.read_only_transaction().await?;
661
662 let rows = if let Some(last_execution_id) = last_execution_id {
663 let stmt = tx
664 .prepare(
665 r#"--sql
666 SELECT
667 ora.execution.id,
668 ora.job.id,
669 ora.execution.executor_id,
670 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
671 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
672 ora.job.timeout_policy_json,
673 ora.job.retry_policy_json,
674 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
675 FROM
676 ora.execution
677 JOIN ora.job ON
678 ora.execution.job_id = ora.job.id
679 WHERE
680 status = 1
681 AND ora.execution.id > $1::UUID
682 ORDER BY
683 ora.execution.id ASC
684 LIMIT 1000
685 "#,
686 )
687 .await?;
688
689 tx.query(&stmt, &[&last_execution_id.0]).await?
690 } else {
691 let stmt = tx
692 .prepare(
693 r#"--sql
694 SELECT
695 ora.execution.id,
696 ora.job.id,
697 ora.execution.executor_id,
698 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
699 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
700 ora.job.timeout_policy_json,
701 ora.job.retry_policy_json,
702 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
703 FROM
704 ora.execution
705 JOIN ora.job ON
706 ora.execution.job_id = ora.job.id
707 WHERE
708 status = 1
709 ORDER BY
710 ora.execution.id ASC
711 LIMIT 1000
712 "#,
713 )
714 .await?;
715
716 tx.query(&stmt, &[]).await?
717 };
718
719 tx.commit().await?;
720
721 if rows.is_empty() {
722 break;
723 }
724
725 let mut in_progress_executions = Vec::with_capacity(rows.len());
726
727 for row in rows {
728 in_progress_executions.push(InProgressExecution {
729 execution_id: ExecutionId(row.try_get(0)?),
730 job_id: JobId(row.try_get(1)?),
731 executor_id: ExecutorId(row.try_get(2)?),
732 target_execution_time: UNIX_EPOCH
733 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
734 started_at: UNIX_EPOCH
735 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
736 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
737 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
738 attempt_number: row.try_get::<_, i64>(7)? as u64,
739 });
740 }
741
742 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
743
744 yield in_progress_executions;
745 }
746 })
747 }
748
749 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
750 if executions.is_empty() {
751 return Ok(());
752 }
753
754 let mut conn = self.pool.get().await?;
755
756 let tx = conn.transaction().await?;
757
758 let stmt = tx
759 .prepare(
760 r#"--sql
761 UPDATE ora.execution
762 SET
763 executor_id = t.executor_id,
764 started_at = to_timestamp(t.started_at)
765 FROM UNNEST(
766 $1::UUID[],
767 $2::UUID[],
768 $3::DOUBLE PRECISION[]
769 ) AS t(execution_id, executor_id, started_at)
770 WHERE
771 execution_id = id
772 AND ora.execution.started_at IS NULL
773 "#,
774 )
775 .await?;
776
777 let mut col_execution_id = Vec::with_capacity(executions.len());
778 let mut col_executor_id = Vec::with_capacity(executions.len());
779 let mut col_started_at = Vec::with_capacity(executions.len());
780
781 for execution in executions {
782 col_execution_id.push(execution.execution_id.0);
783 col_executor_id.push(execution.executor_id.0);
784 col_started_at.push(
785 execution
786 .started_at
787 .duration_since(UNIX_EPOCH)
788 .unwrap_or_default()
789 .as_secs_f64(),
790 );
791 }
792
793 tx.execute(
794 &stmt,
795 &[&col_execution_id, &col_executor_id, &col_started_at],
796 )
797 .await?;
798
799 tx.commit().await?;
800
801 Ok(())
802 }
803
804 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
805 if executions.is_empty() {
806 return Ok(());
807 }
808
809 let mut conn = self.pool.get().await?;
810
811 let tx = conn.transaction().await?;
812
813 let stmt = tx
814 .prepare(
815 r#"--sql
816 UPDATE ora.execution
817 SET
818 succeeded_at = to_timestamp(t.succeeded_at),
819 output_json = t.output_json
820 FROM UNNEST(
821 $1::UUID[],
822 $2::DOUBLE PRECISION[],
823 $3::TEXT[]
824 ) AS t(execution_id, succeeded_at, output_json)
825 WHERE
826 execution_id = id
827 AND status < 2
828 RETURNING job_id
829 "#,
830 )
831 .await?;
832
833 let mut col_execution_id = Vec::with_capacity(executions.len());
834 let mut col_succeeded_at = Vec::with_capacity(executions.len());
835 let mut col_output_json = Vec::with_capacity(executions.len());
836
837 for execution in executions {
838 col_execution_id.push(execution.execution_id.0);
839 col_succeeded_at.push(
840 execution
841 .succeeded_at
842 .duration_since(UNIX_EPOCH)
843 .unwrap_or_default()
844 .as_secs_f64(),
845 );
846 col_output_json.push(execution.output_json.as_str());
847 }
848
849 let rows = tx
850 .query(
851 &stmt,
852 &[&col_execution_id, &col_succeeded_at, &col_output_json],
853 )
854 .await?;
855
856 let mut job_ids = Vec::with_capacity(rows.len());
857 for row in rows {
858 job_ids.push(JobId(row.try_get(0)?));
859 }
860
861 mark_jobs_inactive(&tx, &job_ids).await?;
862
863 tx.commit().await?;
864
865 Ok(())
866 }
867
868 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
869 if executions.is_empty() {
870 return Ok(());
871 }
872
873 let mut conn = self.pool.get().await?;
874 let tx = conn.transaction().await?;
875
876 let jobs = executions_failed(&tx, executions).await?;
877
878 mark_jobs_inactive(&tx, &jobs).await?;
879
880 tx.commit().await?;
881
882 Ok(())
883 }
884
885 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
886 if executions.is_empty() {
887 return Ok(());
888 }
889
890 let mut conn = self.pool.get().await?;
891 let tx = conn.transaction().await?;
892
893 let job_ids = executions_failed(&tx, executions).await?;
894 add_executions(&tx, &job_ids).await?;
895
896 tx.commit().await?;
897
898 Ok(())
899 }
900
901 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
902 async_stream::try_stream!({
903 let mut last_schedule_id: Option<ScheduleId> = None;
904 loop {
905 let mut conn = self.pool.get().await?;
906 let tx = conn.read_only_transaction().await?;
907
908 let rows = if let Some(last_schedule_id) = last_schedule_id {
909 let stmt = tx
910 .prepare(
911 r#"--sql
912 SELECT
913 id,
914 EXTRACT(EPOCH FROM(
915 SELECT
916 target_execution_time
917 FROM
918 ora.job
919 WHERE
920 ora.job.schedule_id = ora.schedule.id
921 ORDER BY ora.job.id DESC
922 LIMIT 1
923 ))::DOUBLE PRECISION,
924 scheduling_policy_json,
925 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
926 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
927 job_template_json
928 FROM
929 ora.schedule
930 WHERE
931 (
932 ora.schedule.stopped_at IS NULL
933 AND ora.schedule.active_job_id IS NULL
934 )
935 AND id > $1::UUID
936 ORDER BY id ASC
937 LIMIT 1000
938 "#,
939 )
940 .await?;
941
942 tx.query(&stmt, &[&last_schedule_id.0]).await?
943 } else {
944 let stmt = tx
945 .prepare(
946 r#"--sql
947 SELECT
948 id,
949 EXTRACT(EPOCH FROM(
950 SELECT
951 target_execution_time
952 FROM
953 ora.job
954 WHERE
955 ora.job.schedule_id = ora.schedule.id
956 ORDER BY ora.job.id DESC
957 LIMIT 1
958 ))::DOUBLE PRECISION,
959 scheduling_policy_json,
960 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
961 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
962 job_template_json
963 FROM
964 ora.schedule
965 WHERE
966 ora.schedule.stopped_at IS NULL
967 AND ora.schedule.active_job_id IS NULL
968 ORDER BY id ASC
969 LIMIT 1000
970 "#,
971 )
972 .await?;
973
974 tx.query(&stmt, &[]).await?
975 };
976
977 tx.commit().await?;
978
979 if rows.is_empty() {
980 break;
981 }
982
983 let mut schedules = Vec::with_capacity(rows.len());
984
985 for row in rows {
986 schedules.push(PendingSchedule {
987 schedule_id: ScheduleId(row.try_get(0)?),
988 last_target_execution_time: row
989 .try_get::<_, Option<f64>>(1)?
990 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
991 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
992 time_range: TimeRange {
993 start: row
994 .try_get::<_, Option<f64>>(3)?
995 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
996 end: row
997 .try_get::<_, Option<f64>>(4)?
998 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
999 },
1000 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1001 });
1002 }
1003
1004 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1005 yield schedules;
1006 }
1007 })
1008 }
1009
1010 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1011 let mut conn = self.pool.get().await?;
1012
1013 loop {
1016 let mut deleted_rows = 0;
1017
1018 {
1019 let tx = conn.transaction().await?;
1020
1021 let stmt = tx
1022 .prepare(
1023 r#"--sql
1024 DELETE FROM ora.job
1025 WHERE id IN (
1026 SELECT ora.job.id
1027 FROM ora.job
1028 JOIN LATERAL (
1029 SELECT
1030 status,
1031 succeeded_at,
1032 failed_at,
1033 cancelled_at
1034 FROM
1035 ora.execution
1036 WHERE
1037 ora.execution.job_id = ora.job.id
1038 ORDER BY ora.execution.id DESC
1039 LIMIT 1
1040 ) e ON TRUE
1041 WHERE
1042 ora.job.inactive
1043 AND (
1044 e.succeeded_at < to_timestamp($1)
1045 OR e.failed_at < to_timestamp($1)
1046 OR e.cancelled_at < to_timestamp($1)
1047 )
1048 LIMIT 25
1049 FOR UPDATE SKIP LOCKED
1050 );
1051 "#,
1052 )
1053 .await?;
1054
1055 let deleted = tx
1056 .execute(
1057 &stmt,
1058 &[&before
1059 .duration_since(UNIX_EPOCH)
1060 .unwrap_or_default()
1061 .as_secs_f64()],
1062 )
1063 .await?;
1064
1065 deleted_rows += deleted;
1066
1067 tx.commit().await?;
1068 }
1069
1070 {
1071 let tx = conn.transaction().await?;
1072 let stmt = tx
1073 .prepare(
1074 r#"--sql
1075 DELETE FROM ONLY ora.schedule
1076 WHERE ctid IN (
1077 SELECT ctid
1078 FROM ora.schedule
1079 WHERE
1080 ora.schedule.stopped_at < to_timestamp($1)
1081 LIMIT 25
1082 FOR UPDATE SKIP LOCKED
1083 )
1084 "#,
1085 )
1086 .await?;
1087
1088 let deleted = tx
1089 .execute(
1090 &stmt,
1091 &[&before
1092 .duration_since(UNIX_EPOCH)
1093 .unwrap_or_default()
1094 .as_secs_f64()],
1095 )
1096 .await?;
1097
1098 tx.commit().await?;
1099
1100 deleted_rows += deleted;
1101 }
1102
1103 {
1104 let tx = conn.transaction().await?;
1105
1106 let stmt = tx
1107 .prepare(
1108 r#"--sql
1109 DELETE FROM ora.job_type
1110 WHERE
1111 NOT EXISTS (
1112 SELECT FROM ora.job
1113 WHERE
1114 ora.job.job_type_id = ora.job_type.id
1115 )
1116 AND NOT EXISTS (
1117 SELECT FROM ora.schedule
1118 WHERE
1119 ora.schedule.job_template_job_type_id = ora.job_type.id
1120 )
1121 "#,
1122 )
1123 .await?;
1124
1125 tx.execute(&stmt, &[]).await?;
1126
1127 tx.commit().await?;
1128 }
1129
1130 if deleted_rows == 0 {
1131 break;
1132 }
1133 }
1134
1135 Ok(())
1136 }
1137
1138 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1139 loop {
1140 let mut conn = self.pool.get().await?;
1141
1142 let has_pending = {
1143 let tx = conn.read_only_transaction().await?;
1144
1145 let stmt = tx
1146 .prepare(
1147 r#"--sql
1148 SELECT 1 FROM ora.schedule
1149 WHERE
1150 ora.schedule.stopped_at IS NULL
1151 AND ora.schedule.active_job_id IS NULL
1152 LIMIT 1
1153 "#,
1154 )
1155 .await?;
1156
1157 let row = tx.query_opt(&stmt, &[]).await?;
1158
1159 tx.commit().await?;
1160
1161 row.is_some()
1162 };
1163
1164 drop(conn);
1165
1166 if has_pending {
1167 break;
1168 }
1169
1170 tokio::time::sleep(Duration::from_millis(500)).await;
1171 }
1172
1173 Ok(())
1174 }
1175}
1176
1177async fn executions_failed(
1178 tx: &DbTransaction<'_>,
1179 executions: &[FailedExecution],
1180) -> Result<Vec<JobId>> {
1181 let stmt = tx
1182 .prepare(
1183 r#"--sql
1184 UPDATE ora.execution
1185 SET
1186 failed_at = to_timestamp(t.failed_at),
1187 failure_reason = t.failure_reason
1188 FROM UNNEST(
1189 $1::UUID[],
1190 $2::DOUBLE PRECISION[],
1191 $3::TEXT[]
1192 ) AS t(execution_id, failed_at, failure_reason)
1193 WHERE
1194 execution_id = id
1195 AND status < 2
1196 RETURNING job_id;
1197 "#,
1198 )
1199 .await?;
1200
1201 let mut col_execution_id = Vec::with_capacity(executions.len());
1202 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1203 let mut col_failure_reason = Vec::with_capacity(executions.len());
1204
1205 for execution in executions {
1206 col_execution_id.push(execution.execution_id.0);
1207 col_succeeded_at.push(
1208 execution
1209 .failed_at
1210 .duration_since(UNIX_EPOCH)
1211 .unwrap_or_default()
1212 .as_secs_f64(),
1213 );
1214 col_failure_reason.push(execution.failure_reason.as_str());
1215 }
1216
1217 let rows = tx
1218 .query(
1219 &stmt,
1220 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1221 )
1222 .await?;
1223
1224 let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1225
1226 Ok(job_ids)
1227}
1228
1229async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1230 if jobs.is_empty() {
1231 return Ok(());
1232 }
1233
1234 let mut col_job_id = Vec::with_capacity(jobs.len());
1235 let mut col_execution_id = Vec::with_capacity(jobs.len());
1236
1237 for job in jobs {
1238 col_job_id.push(job.0);
1239 col_execution_id.push(Uuid::now_v7());
1240 }
1241
1242 let stmt = tx
1243 .prepare(
1244 r#"--sql
1245 INSERT INTO ora.execution (
1246 id,
1247 job_id
1248 ) SELECT * FROM UNNEST(
1249 $1::UUID[],
1250 $2::UUID[]
1251 )
1252 "#,
1253 )
1254 .await?;
1255
1256 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1257
1258 Ok(())
1259}
1260
1261async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1262 if jobs.is_empty() {
1263 return Ok(());
1264 }
1265
1266 let mut col_job_id = Vec::with_capacity(jobs.len());
1267
1268 for job in jobs {
1269 col_job_id.push(job.0);
1270 }
1271
1272 {
1273 let stmt = tx
1274 .prepare(
1275 r#"--sql
1276 UPDATE ora.job
1277 SET inactive = TRUE
1278 WHERE id = ANY($1::UUID[])
1279 "#,
1280 )
1281 .await?;
1282
1283 tx.execute(&stmt, &[&col_job_id]).await?;
1284 }
1285
1286 {
1287 let stmt = tx
1288 .prepare(
1289 r#"--sql
1290 UPDATE ora.schedule
1291 SET
1292 active_job_id = NULL
1293 WHERE active_job_id = ANY($1::UUID[])
1294 "#,
1295 )
1296 .await?;
1297
1298 tx.execute(&stmt, &[&col_job_id]).await?;
1299 }
1300
1301 Ok(())
1302}