1#![allow(missing_docs)]
2
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8 Backend,
9 common::{NextPageToken, TimeRange},
10 executions::{
11 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12 SucceededExecution,
13 },
14 executors::ExecutorId,
15 jobs::{
16 AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17 NewJob,
18 },
19 schedules::{
20 AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21 ScheduleId, ScheduleOrderBy, StoppedSchedule,
22 },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30 db::{DbPool, DbTransaction},
31 query::{
32 jobs::{cancel_jobs, job_count},
33 schedules::schedule_count,
34 },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44 pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49#[derive(Error, Debug)]
51pub enum Error {
52 #[error("{0}")]
53 Migrations(#[from] refinery::Error),
54 #[error("{0}")]
55 Pool(#[from] PoolError),
56 #[error("{0}")]
57 Postgres(#[from] tokio_postgres::Error),
58 #[error("{0}")]
59 Serde(#[from] serde_json::Error),
60 #[error("invalid page token: {0}")]
61 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65 pub async fn new(pool: Pool) -> Result<Self> {
68 let mut conn = pool.get().await?;
69 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70 migrations::runner()
71 .set_migration_table_name("ora.migrations")
72 .run_async(&mut **conn)
73 .await?;
74 Ok(Self { pool: DbPool(pool) })
75 }
76}
77
78impl Backend for PostgresBackend {
79 type Error = Error;
80
81 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82 if job_types.is_empty() {
83 return Ok(());
84 }
85
86 let mut conn = self.pool.get().await?;
87
88 let tx = conn.transaction().await?;
89
90 let mut col_id = Vec::with_capacity(job_types.len());
91 let mut col_description = Vec::with_capacity(job_types.len());
92 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95 for job_type in job_types {
96 col_id.push(job_type.id.as_str());
97 col_description.push(job_type.description.as_deref());
98 col_input_schema_json.push(job_type.input_schema_json.as_deref());
99 col_output_schema_json.push(job_type.output_schema_json.as_deref());
100 }
101
102 {
103 let stmt = tx
104 .prepare(
105 r#"--sql
106 INSERT INTO ora.job_type (
107 id,
108 description,
109 input_schema_json,
110 output_schema_json
111 ) SELECT * FROM UNNEST(
112 $1::TEXT[],
113 $2::TEXT[],
114 $3::TEXT[],
115 $4::TEXT[]
116 ) ON CONFLICT (id) DO UPDATE SET
117 description = EXCLUDED.description,
118 input_schema_json = EXCLUDED.input_schema_json,
119 output_schema_json = EXCLUDED.output_schema_json
120 WHERE
121 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124 "#,
125 )
126 .await?;
127
128 tx.execute(
129 &stmt,
130 &[
131 &col_id,
132 &col_description,
133 &col_input_schema_json,
134 &col_output_schema_json,
135 ],
136 )
137 .await?;
138 }
139
140 tx.commit().await?;
141
142 Ok(())
143 }
144
145 async fn list_job_types(&self) -> Result<Vec<JobType>> {
146 let mut conn = self.pool.get().await?;
147
148 let tx = conn.read_only_transaction().await?;
149
150 let job_types = {
151 let stmt = tx
152 .prepare(
153 r#"--sql
154 SELECT
155 id,
156 description,
157 input_schema_json,
158 output_schema_json
159 FROM
160 ora.job_type
161 ORDER BY id
162 "#,
163 )
164 .await?;
165
166 let rows = tx.query(&stmt, &[]).await?;
167
168 rows.into_iter()
169 .map(|row| {
170 Result::<_>::Ok(JobType {
171 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172 description: row.try_get(1)?,
173 input_schema_json: row.try_get(2)?,
174 output_schema_json: row.try_get(3)?,
175 })
176 })
177 .collect::<Result<Vec<_>>>()?
178 };
179
180 tx.commit().await?;
181
182 Ok(job_types)
183 }
184
185 async fn add_jobs(
186 &self,
187 jobs: &[NewJob],
188 if_not_exists: Option<JobFilters>,
189 ) -> Result<AddedJobs> {
190 if jobs.is_empty() {
191 return Ok(AddedJobs::Added(Vec::new()));
192 }
193
194 let mut conn = self.pool.get().await?;
195
196 let tx = conn.transaction().await?;
197
198 if let Some(filters) = if_not_exists {
199 let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201 if !existing_job_ids.is_empty() {
202 tx.commit().await?;
203 return Ok(AddedJobs::Existing(existing_job_ids));
204 }
205 }
206
207 let mut col_id = Vec::with_capacity(jobs.len());
208 let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210 {
211 let mut col_job_type_id = Vec::with_capacity(jobs.len());
212 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217 for job in jobs {
218 col_id.push(Uuid::now_v7());
219 col_job_type_id.push(job.job.job_type_id.as_str());
220 col_target_execution_time.push(job.job.target_execution_time);
221 col_input_payload_json.push(job.job.input_payload_json.as_str());
222 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224 col_schedule_id.push(job.schedule_id.map(|s| s.0));
225 }
226
227 let stmt = tx
228 .prepare(
229 r#"--sql
230 INSERT INTO ora.job (
231 id,
232 job_type_id,
233 target_execution_time,
234 input_payload_json,
235 timeout_policy_json,
236 retry_policy_json,
237 schedule_id
238 ) SELECT * FROM UNNEST(
239 $1::UUID[],
240 $2::TEXT[],
241 $3::TIMESTAMPTZ[],
242 $4::TEXT[],
243 $5::TEXT[],
244 $6::TEXT[],
245 $7::UUID[]
246 )
247 "#,
248 )
249 .await?;
250
251 tx.execute(
252 &stmt,
253 &[
254 &col_id,
255 &col_job_type_id,
256 &col_target_execution_time,
257 &col_input_payload_json,
258 &col_timeout_policy_json,
259 &col_retry_policy_json,
260 &col_schedule_id,
261 ],
262 )
263 .await?;
264 }
265
266 {
267 let mut col_job_id = Vec::with_capacity(jobs.len());
268 let mut col_job_label_key = Vec::with_capacity(jobs.len());
269 let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271 for (i, job) in jobs.iter().enumerate() {
272 for label in &job.job.labels {
273 col_job_id.push(col_id[i]);
274 col_job_label_key.push(label.key.as_str());
275 col_job_label_value.push(label.value.as_str());
276 }
277 }
278
279 let stmt = tx
280 .prepare(
281 r#"--sql
282 INSERT INTO ora.job_label (
283 job_id,
284 label_key,
285 label_value
286 ) SELECT * FROM UNNEST(
287 $1::UUID[],
288 $2::TEXT[],
289 $3::TEXT[]
290 )
291 "#,
292 )
293 .await?;
294
295 tx.execute(
296 &stmt,
297 &[&col_job_id, &col_job_label_key, &col_job_label_value],
298 )
299 .await?;
300 }
301
302 {
303 let stmt = tx
304 .prepare(
305 r#"--sql
306 UPDATE ora.schedule
307 SET
308 active_job_id = t.job_id
309 FROM UNNEST(
310 $1::UUID[],
311 $2::UUID[]
312 ) AS t(job_id, schedule_id)
313 WHERE
314 ora.schedule.id = t.schedule_id
315 "#,
316 )
317 .await?;
318
319 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320 }
321
322 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323 add_executions(&tx, &job_ids).await?;
324
325 tx.commit().await?;
326
327 Ok(AddedJobs::Added(job_ids))
328 }
329
330 async fn list_jobs(
331 &self,
332 filters: JobFilters,
333 order_by: Option<JobOrderBy>,
334 page_size: u32,
335 page_token: Option<NextPageToken>,
336 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337 let mut conn = self.pool.get().await?;
338 let tx = conn.read_only_transaction().await?;
339
340 let (jobs, next_page_token) = query::jobs::job_details(
341 &tx,
342 filters,
343 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344 page_size,
345 page_token.map(|t| t.0),
346 )
347 .await?;
348
349 tx.commit().await?;
350
351 Ok((jobs, next_page_token))
352 }
353
354 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355 let mut conn = self.pool.get().await?;
356 let tx = conn.read_only_transaction().await?;
357 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358 tx.commit().await?;
359 Ok(count)
360 }
361
362 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363 let mut conn = self.pool.get().await?;
364 let tx = conn.transaction().await?;
365 let now = std::time::SystemTime::now();
366
367 let jobs = cancel_jobs(&tx, filters).await?;
368 mark_jobs_inactive(&tx, &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>()).await?;
369 tx.commit().await?;
370
371 Ok(jobs)
372 }
373
374 async fn add_schedules(
375 &self,
376 schedules: &[ScheduleDefinition],
377 if_not_exists: Option<ScheduleFilters>,
378 ) -> Result<AddedSchedules> {
379 if schedules.is_empty() {
380 return Ok(AddedSchedules::Added(Vec::new()));
381 }
382
383 let mut conn = self.pool.get().await?;
384 let tx = conn.transaction().await?;
385
386 if let Some(filters) = if_not_exists {
387 let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
388
389 if !existing_schedule_ids.is_empty() {
390 tx.commit().await?;
391 return Ok(AddedSchedules::Existing(existing_schedule_ids));
392 }
393 }
394
395 let mut col_id = Vec::with_capacity(schedules.len());
396 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
397 let mut col_job_template_json = Vec::with_capacity(schedules.len());
398 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
399 let mut col_start_after = Vec::with_capacity(schedules.len());
400 let mut col_end_before = Vec::with_capacity(schedules.len());
401
402 for schedule in schedules {
403 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
404
405 col_id.push(Uuid::now_v7());
406 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
407 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
408 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
409 col_start_after.push(start_after);
410 col_end_before.push(end_before);
411 }
412
413 {
414 let stmt = tx
415 .prepare(
416 r#"--sql
417 INSERT INTO ora.schedule (
418 id,
419 job_template_job_type_id,
420 job_template_json,
421 scheduling_policy_json,
422 start_after,
423 end_before
424 ) SELECT * FROM UNNEST(
425 $1::UUID[],
426 $2::TEXT[],
427 $3::TEXT[],
428 $4::TEXT[],
429 $5::TIMESTAMPTZ[],
430 $6::TIMESTAMPTZ[]
431 )
432 "#,
433 )
434 .await?;
435
436 tx.execute(
437 &stmt,
438 &[
439 &col_id,
440 &col_job_template_job_type_id,
441 &col_job_template_json,
442 &col_scheduling_policy_json,
443 &col_start_after,
444 &col_end_before,
445 ],
446 )
447 .await?;
448 }
449
450 {
451 let mut col_schedule_id = Vec::with_capacity(schedules.len());
452 let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
453 let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
454
455 for (i, schedule) in schedules.iter().enumerate() {
456 for label in &schedule.labels {
457 col_schedule_id.push(col_id[i]);
458 col_schedule_label_key.push(label.key.as_str());
459 col_schedule_label_value.push(label.value.as_str());
460 }
461 }
462
463 let stmt = tx
464 .prepare(
465 r#"--sql
466 INSERT INTO ora.schedule_label (
467 schedule_id,
468 label_key,
469 label_value
470 ) SELECT * FROM UNNEST(
471 $1::UUID[],
472 $2::TEXT[],
473 $3::TEXT[]
474 )
475 "#,
476 )
477 .await?;
478
479 tx.execute(
480 &stmt,
481 &[
482 &col_schedule_id,
483 &col_schedule_label_key,
484 &col_schedule_label_value,
485 ],
486 )
487 .await?;
488 }
489
490 tx.commit().await?;
491
492 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
493
494 Ok(AddedSchedules::Added(schedule_ids))
495 }
496
497 async fn list_schedules(
498 &self,
499 filters: ScheduleFilters,
500 order_by: Option<ScheduleOrderBy>,
501 page_size: u32,
502 page_token: Option<NextPageToken>,
503 ) -> Result<(
504 Vec<ScheduleDetails>,
505 Option<ora_backend::common::NextPageToken>,
506 )> {
507 let mut conn = self.pool.get().await?;
508 let tx = conn.read_only_transaction().await?;
509
510 let (schedules, next_page_token) = query::schedules::schedule_details(
511 &tx,
512 filters,
513 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
514 page_size,
515 page_token.map(|t| t.0),
516 )
517 .await?;
518
519 tx.commit().await?;
520
521 Ok((schedules, next_page_token))
522 }
523
524 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
525 let mut conn = self.pool.get().await?;
526 let tx = conn.read_only_transaction().await?;
527 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
528 tx.commit().await?;
529 Ok(count)
530 }
531
532 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
533 let mut conn = self.pool.get().await?;
534 let tx = conn.transaction().await?;
535 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
536 tx.commit().await?;
537 Ok(schedules)
538 }
539
540 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
541 async_stream::try_stream!({
542 let mut last_execution_id: Option<ExecutionId> = None;
543
544 loop {
545 let mut conn = self.pool.get().await?;
546 let tx = conn.read_only_transaction().await?;
547
548 let rows = if let Some(last_execution_id) = last_execution_id {
549 let stmt = tx
550 .prepare(
551 r#"--sql
552 SELECT
553 ora.execution.id,
554 ora.job.id,
555 ora.job.job_type_id,
556 ora.job.input_payload_json,
557 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
558 ora.job.retry_policy_json,
559 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
560 FROM
561 ora.execution
562 JOIN ora.job ON
563 ora.execution.job_id = ora.job.id
564 WHERE
565 status = 0
566 AND ora.job.target_execution_time <= NOW()
567 AND ora.execution.id > $1::UUID
568 ORDER BY
569 ora.execution.id ASC
570 LIMIT 1000
571 "#,
572 )
573 .await?;
574
575 tx.query(&stmt, &[&last_execution_id.0]).await?
576 } else {
577 let stmt = tx
578 .prepare(
579 r#"--sql
580 SELECT
581 ora.execution.id,
582 ora.job.id,
583 ora.job.job_type_id,
584 ora.job.input_payload_json,
585 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
586 ora.job.retry_policy_json,
587 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
588 FROM
589 ora.execution
590 JOIN ora.job ON
591 ora.execution.job_id = ora.job.id
592 WHERE
593 status = 0
594 AND ora.job.target_execution_time <= NOW()
595 ORDER BY
596 ora.execution.id ASC
597 LIMIT 1000
598 "#,
599 )
600 .await?;
601
602 tx.query(&stmt, &[]).await?
603 };
604
605 tx.commit().await?;
606
607 if rows.is_empty() {
608 break;
609 }
610
611 let mut ready_executions = Vec::with_capacity(rows.len());
612
613 for row in rows {
614 ready_executions.push(ReadyExecution {
615 execution_id: ExecutionId(row.try_get(0)?),
616 job_id: JobId(row.try_get(1)?),
617 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
618 input_payload_json: row.try_get(3)?,
619 attempt_number: row.try_get::<_, i64>(4)? as u64,
620 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
621 target_execution_time: UNIX_EPOCH
622 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
623 });
624 }
625
626 last_execution_id = ready_executions.last().map(|e| e.execution_id);
627
628 yield ready_executions;
629 }
630 })
631 }
632
633 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
634 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
635
636 loop {
637 let mut conn = self.pool.get().await?;
638
639 let next_timestamp = {
640 let tx = conn.read_only_transaction().await?;
641
642 let stmt = tx
643 .prepare(
644 r#"--sql
645 SELECT
646 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
647 FROM
648 ora.job
649 JOIN ora.execution ON
650 ora.execution.job_id = ora.job.id
651 WHERE
652 ora.execution.status = 0
653 AND NOT (ora.execution.id = ANY($1::UUID[]))
654 ORDER BY
655 ora.job.target_execution_time ASC
656 LIMIT 1
657 "#,
658 )
659 .await?;
660
661 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
662
663 tx.commit().await?;
664
665 match row {
666 Some(row) => row
667 .try_get::<_, Option<f64>>(0)?
668 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
669 None => None,
670 }
671 };
672
673 drop(conn);
674
675 if let Some(next_timestamp) = next_timestamp {
676 let now = std::time::SystemTime::now();
677
678 if next_timestamp <= now {
679 break;
680 }
681
682 tokio::time::sleep(
683 next_timestamp
684 .duration_since(now)
685 .unwrap_or_else(|_| Duration::from_secs(0)),
686 )
687 .await;
688 break;
689 }
690
691 tokio::time::sleep(Duration::from_millis(500)).await;
692 }
693
694 Ok(())
695 }
696
697 fn in_progress_executions(
698 &self,
699 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
700 async_stream::try_stream!({
701 let mut last_execution_id: Option<ExecutionId> = None;
702
703 loop {
704 let mut conn = self.pool.get().await?;
705 let tx = conn.read_only_transaction().await?;
706
707 let rows = if let Some(last_execution_id) = last_execution_id {
708 let stmt = tx
709 .prepare(
710 r#"--sql
711 SELECT
712 ora.execution.id,
713 ora.job.id,
714 ora.execution.executor_id,
715 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
716 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
717 ora.job.timeout_policy_json,
718 ora.job.retry_policy_json,
719 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
720 FROM
721 ora.execution
722 JOIN ora.job ON
723 ora.execution.job_id = ora.job.id
724 WHERE
725 status = 1
726 AND ora.execution.id > $1::UUID
727 ORDER BY
728 ora.execution.id ASC
729 LIMIT 1000
730 "#,
731 )
732 .await?;
733
734 tx.query(&stmt, &[&last_execution_id.0]).await?
735 } else {
736 let stmt = tx
737 .prepare(
738 r#"--sql
739 SELECT
740 ora.execution.id,
741 ora.job.id,
742 ora.execution.executor_id,
743 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
744 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
745 ora.job.timeout_policy_json,
746 ora.job.retry_policy_json,
747 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
748 FROM
749 ora.execution
750 JOIN ora.job ON
751 ora.execution.job_id = ora.job.id
752 WHERE
753 status = 1
754 ORDER BY
755 ora.execution.id ASC
756 LIMIT 1000
757 "#,
758 )
759 .await?;
760
761 tx.query(&stmt, &[]).await?
762 };
763
764 tx.commit().await?;
765
766 if rows.is_empty() {
767 break;
768 }
769
770 let mut in_progress_executions = Vec::with_capacity(rows.len());
771
772 for row in rows {
773 in_progress_executions.push(InProgressExecution {
774 execution_id: ExecutionId(row.try_get(0)?),
775 job_id: JobId(row.try_get(1)?),
776 executor_id: ExecutorId(row.try_get(2)?),
777 target_execution_time: UNIX_EPOCH
778 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
779 started_at: UNIX_EPOCH
780 + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
781 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
782 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
783 attempt_number: row.try_get::<_, i64>(7)? as u64,
784 });
785 }
786
787 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
788
789 yield in_progress_executions;
790 }
791 })
792 }
793
794 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
795 if executions.is_empty() {
796 return Ok(());
797 }
798
799 let mut conn = self.pool.get().await?;
800
801 let tx = conn.transaction().await?;
802
803 let stmt = tx
804 .prepare(
805 r#"--sql
806 UPDATE ora.execution
807 SET
808 executor_id = t.executor_id,
809 started_at = to_timestamp(t.started_at)
810 FROM UNNEST(
811 $1::UUID[],
812 $2::UUID[],
813 $3::DOUBLE PRECISION[]
814 ) AS t(execution_id, executor_id, started_at)
815 WHERE
816 execution_id = id
817 AND ora.execution.started_at IS NULL
818 "#,
819 )
820 .await?;
821
822 let mut col_execution_id = Vec::with_capacity(executions.len());
823 let mut col_executor_id = Vec::with_capacity(executions.len());
824 let mut col_started_at = Vec::with_capacity(executions.len());
825
826 for execution in executions {
827 col_execution_id.push(execution.execution_id.0);
828 col_executor_id.push(execution.executor_id.0);
829 col_started_at.push(
830 execution
831 .started_at
832 .duration_since(UNIX_EPOCH)
833 .unwrap_or_default()
834 .as_secs_f64(),
835 );
836 }
837
838 tx.execute(
839 &stmt,
840 &[&col_execution_id, &col_executor_id, &col_started_at],
841 )
842 .await?;
843
844 tx.commit().await?;
845
846 Ok(())
847 }
848
849 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
850 if executions.is_empty() {
851 return Ok(());
852 }
853
854 let mut conn = self.pool.get().await?;
855
856 let tx = conn.transaction().await?;
857
858 let stmt = tx
859 .prepare(
860 r#"--sql
861 UPDATE ora.execution
862 SET
863 succeeded_at = to_timestamp(t.succeeded_at),
864 output_json = t.output_json
865 FROM UNNEST(
866 $1::UUID[],
867 $2::DOUBLE PRECISION[],
868 $3::TEXT[]
869 ) AS t(execution_id, succeeded_at, output_json)
870 WHERE
871 execution_id = id
872 AND status < 2
873 RETURNING job_id, t.succeeded_at
874 "#,
875 )
876 .await?;
877
878 let mut col_execution_id = Vec::with_capacity(executions.len());
879 let mut col_succeeded_at = Vec::with_capacity(executions.len());
880 let mut col_output_json = Vec::with_capacity(executions.len());
881
882 for execution in executions {
883 col_execution_id.push(execution.execution_id.0);
884 col_succeeded_at.push(
885 execution
886 .succeeded_at
887 .duration_since(UNIX_EPOCH)
888 .unwrap_or_default()
889 .as_secs_f64(),
890 );
891 col_output_json.push(execution.output_json.as_str());
892 }
893
894 let rows = tx
895 .query(
896 &stmt,
897 &[&col_execution_id, &col_succeeded_at, &col_output_json],
898 )
899 .await?;
900
901 let mut jobs = Vec::with_capacity(rows.len());
902 for row in rows {
903 jobs.push((
904 JobId(row.try_get(0)?),
905 UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
906 ));
907 }
908
909 mark_jobs_inactive(&tx, &jobs).await?;
910
911 tx.commit().await?;
912
913 Ok(())
914 }
915
916 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
917 if executions.is_empty() {
918 return Ok(());
919 }
920
921 let mut conn = self.pool.get().await?;
922 let tx = conn.transaction().await?;
923
924 let jobs = executions_failed(&tx, executions).await?;
925
926 mark_jobs_inactive(&tx, &jobs).await?;
927
928 tx.commit().await?;
929
930 Ok(())
931 }
932
933 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
934 if executions.is_empty() {
935 return Ok(());
936 }
937
938 let mut conn = self.pool.get().await?;
939 let tx = conn.transaction().await?;
940
941 let jobs = executions_failed(&tx, executions).await?;
942 add_executions(
943 &tx,
944 &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
945 )
946 .await?;
947
948 tx.commit().await?;
949
950 Ok(())
951 }
952
953 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
954 async_stream::try_stream!({
955 let mut last_schedule_id: Option<ScheduleId> = None;
956 loop {
957 let mut conn = self.pool.get().await?;
958 let tx = conn.read_only_transaction().await?;
959
960 let rows = if let Some(last_schedule_id) = last_schedule_id {
961 let stmt = tx
962 .prepare(
963 r#"--sql
964 SELECT
965 id,
966 EXTRACT(EPOCH FROM(
967 SELECT
968 target_execution_time
969 FROM
970 ora.job
971 WHERE
972 ora.job.schedule_id = ora.schedule.id
973 ORDER BY ora.job.id DESC
974 LIMIT 1
975 ))::DOUBLE PRECISION,
976 scheduling_policy_json,
977 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
978 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
979 job_template_json
980 FROM
981 ora.schedule
982 WHERE
983 (
984 ora.schedule.stopped_at IS NULL
985 AND ora.schedule.active_job_id IS NULL
986 )
987 AND id > $1::UUID
988 ORDER BY id ASC
989 LIMIT 1000
990 "#,
991 )
992 .await?;
993
994 tx.query(&stmt, &[&last_schedule_id.0]).await?
995 } else {
996 let stmt = tx
997 .prepare(
998 r#"--sql
999 SELECT
1000 id,
1001 EXTRACT(EPOCH FROM(
1002 SELECT
1003 target_execution_time
1004 FROM
1005 ora.job
1006 WHERE
1007 ora.job.schedule_id = ora.schedule.id
1008 ORDER BY ora.job.id DESC
1009 LIMIT 1
1010 ))::DOUBLE PRECISION,
1011 scheduling_policy_json,
1012 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1013 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1014 job_template_json
1015 FROM
1016 ora.schedule
1017 WHERE
1018 ora.schedule.stopped_at IS NULL
1019 AND ora.schedule.active_job_id IS NULL
1020 ORDER BY id ASC
1021 LIMIT 1000
1022 "#,
1023 )
1024 .await?;
1025
1026 tx.query(&stmt, &[]).await?
1027 };
1028
1029 tx.commit().await?;
1030
1031 if rows.is_empty() {
1032 break;
1033 }
1034
1035 let mut schedules = Vec::with_capacity(rows.len());
1036
1037 for row in rows {
1038 schedules.push(PendingSchedule {
1039 schedule_id: ScheduleId(row.try_get(0)?),
1040 last_target_execution_time: row
1041 .try_get::<_, Option<f64>>(1)?
1042 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1043 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1044 time_range: TimeRange {
1045 start: row
1046 .try_get::<_, Option<f64>>(3)?
1047 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1048 end: row
1049 .try_get::<_, Option<f64>>(4)?
1050 .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1051 },
1052 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1053 });
1054 }
1055
1056 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1057 yield schedules;
1058 }
1059 })
1060 }
1061
1062 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1063 let mut conn = self.pool.get().await?;
1064
1065 loop {
1068 let mut deleted_rows = 0;
1069
1070 {
1071 let tx = conn.transaction().await?;
1072
1073 let stmt = tx
1074 .prepare(
1075 r#"--sql
1076 DELETE FROM ora.job
1077 WHERE id IN (
1078 SELECT ora.job.id
1079 FROM ora.job
1080 WHERE
1081 ora.job.inactive_since < to_timestamp($1)
1082 LIMIT 25
1083 FOR UPDATE SKIP LOCKED
1084 );
1085 "#,
1086 )
1087 .await?;
1088
1089 let deleted = tx
1090 .execute(
1091 &stmt,
1092 &[&before
1093 .duration_since(UNIX_EPOCH)
1094 .unwrap_or_default()
1095 .as_secs_f64()],
1096 )
1097 .await?;
1098
1099 deleted_rows += deleted;
1100
1101 tx.commit().await?;
1102 }
1103
1104 {
1105 let tx = conn.transaction().await?;
1106 let stmt = tx
1107 .prepare(
1108 r#"--sql
1109 DELETE FROM ONLY ora.schedule
1110 WHERE ctid IN (
1111 SELECT ctid
1112 FROM ora.schedule
1113 WHERE
1114 ora.schedule.stopped_at < to_timestamp($1)
1115 LIMIT 25
1116 FOR UPDATE SKIP LOCKED
1117 )
1118 "#,
1119 )
1120 .await?;
1121
1122 let deleted = tx
1123 .execute(
1124 &stmt,
1125 &[&before
1126 .duration_since(UNIX_EPOCH)
1127 .unwrap_or_default()
1128 .as_secs_f64()],
1129 )
1130 .await?;
1131
1132 tx.commit().await?;
1133
1134 deleted_rows += deleted;
1135 }
1136
1137 {
1138 let tx = conn.transaction().await?;
1139
1140 let stmt = tx
1141 .prepare(
1142 r#"--sql
1143 DELETE FROM ora.job_type
1144 WHERE
1145 NOT EXISTS (
1146 SELECT FROM ora.job
1147 WHERE
1148 ora.job.job_type_id = ora.job_type.id
1149 )
1150 AND NOT EXISTS (
1151 SELECT FROM ora.schedule
1152 WHERE
1153 ora.schedule.job_template_job_type_id = ora.job_type.id
1154 )
1155 "#,
1156 )
1157 .await?;
1158
1159 tx.execute(&stmt, &[]).await?;
1160
1161 tx.commit().await?;
1162 }
1163
1164 if deleted_rows == 0 {
1165 break;
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1173 loop {
1174 let mut conn = self.pool.get().await?;
1175
1176 let has_pending = {
1177 let tx = conn.read_only_transaction().await?;
1178
1179 let stmt = tx
1180 .prepare(
1181 r#"--sql
1182 SELECT 1 FROM ora.schedule
1183 WHERE
1184 ora.schedule.stopped_at IS NULL
1185 AND ora.schedule.active_job_id IS NULL
1186 LIMIT 1
1187 "#,
1188 )
1189 .await?;
1190
1191 let row = tx.query_opt(&stmt, &[]).await?;
1192
1193 tx.commit().await?;
1194
1195 row.is_some()
1196 };
1197
1198 drop(conn);
1199
1200 if has_pending {
1201 break;
1202 }
1203
1204 tokio::time::sleep(Duration::from_millis(500)).await;
1205 }
1206
1207 Ok(())
1208 }
1209}
1210
1211async fn executions_failed(
1212 tx: &DbTransaction<'_>,
1213 executions: &[FailedExecution],
1214) -> Result<Vec<(JobId, SystemTime)>> {
1215 let stmt = tx
1216 .prepare(
1217 r#"--sql
1218 UPDATE ora.execution
1219 SET
1220 failed_at = to_timestamp(t.failed_at),
1221 failure_reason = t.failure_reason
1222 FROM UNNEST(
1223 $1::UUID[],
1224 $2::DOUBLE PRECISION[],
1225 $3::TEXT[]
1226 ) AS t(execution_id, failed_at, failure_reason)
1227 WHERE
1228 execution_id = id
1229 AND status < 2
1230 RETURNING job_id, t.failed_at
1231 "#,
1232 )
1233 .await?;
1234
1235 let mut col_execution_id = Vec::with_capacity(executions.len());
1236 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1237 let mut col_failure_reason = Vec::with_capacity(executions.len());
1238
1239 for execution in executions {
1240 col_execution_id.push(execution.execution_id.0);
1241 col_succeeded_at.push(
1242 execution
1243 .failed_at
1244 .duration_since(UNIX_EPOCH)
1245 .unwrap_or_default()
1246 .as_secs_f64(),
1247 );
1248 col_failure_reason.push(execution.failure_reason.as_str());
1249 }
1250
1251 let rows = tx
1252 .query(
1253 &stmt,
1254 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1255 )
1256 .await?;
1257
1258 let mut jobs = Vec::with_capacity(rows.len());
1259
1260 for row in rows {
1261 jobs.push((
1262 JobId(row.try_get(0)?),
1263 UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
1264 ));
1265 }
1266
1267 Ok(jobs)
1268}
1269
1270async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1271 if jobs.is_empty() {
1272 return Ok(());
1273 }
1274
1275 let mut col_job_id = Vec::with_capacity(jobs.len());
1276 let mut col_execution_id = Vec::with_capacity(jobs.len());
1277
1278 for job in jobs {
1279 col_job_id.push(job.0);
1280 col_execution_id.push(Uuid::now_v7());
1281 }
1282
1283 let stmt = tx
1284 .prepare(
1285 r#"--sql
1286 INSERT INTO ora.execution (
1287 id,
1288 job_id
1289 ) SELECT * FROM UNNEST(
1290 $1::UUID[],
1291 $2::UUID[]
1292 )
1293 "#,
1294 )
1295 .await?;
1296
1297 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1298
1299 Ok(())
1300}
1301
1302async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1303 if jobs.is_empty() {
1304 return Ok(());
1305 }
1306
1307 let mut col_job_id = Vec::with_capacity(jobs.len());
1308 let mut col_inactive_since = Vec::with_capacity(jobs.len());
1309
1310 for (job, inactive_since) in jobs {
1311 col_job_id.push(job.0);
1312 col_inactive_since.push(
1313 inactive_since
1314 .duration_since(UNIX_EPOCH)
1315 .unwrap_or_default()
1316 .as_secs_f64(),
1317 );
1318 }
1319
1320 {
1321 let stmt = tx
1322 .prepare(
1323 r#"--sql
1324 UPDATE ora.job
1325 SET inactive_since = to_timestamp(t.inactive_since)
1326 FROM
1327 UNNEST(
1328 $1::UUID[],
1329 $2::DOUBLE PRECISION[]
1330 ) AS t(job_id, inactive_since)
1331 WHERE
1332 id = t.job_id;
1333 "#,
1334 )
1335 .await?;
1336
1337 tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1338 .await?;
1339 }
1340
1341 {
1342 let stmt = tx
1343 .prepare(
1344 r#"--sql
1345 UPDATE ora.schedule
1346 SET
1347 active_job_id = NULL
1348 WHERE active_job_id = ANY($1::UUID[])
1349 "#,
1350 )
1351 .await?;
1352
1353 tx.execute(&stmt, &[&col_job_id]).await?;
1354 }
1355
1356 Ok(())
1357}