1#![allow(missing_docs)]
3
4use std::time::{Duration, SystemTime};
5
6use deadpool_postgres::{Pool, PoolError};
7use futures::Stream;
8use ora_backend::{
9 Backend,
10 common::{NextPageToken, TimeRange},
11 executions::{
12 ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
13 SucceededExecution,
14 },
15 executors::ExecutorId,
16 jobs::{
17 AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
18 NewJob,
19 },
20 schedules::{
21 AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
22 ScheduleId, ScheduleOrderBy, StoppedSchedule,
23 },
24};
25
26use refinery::embed_migrations;
27use thiserror::Error;
28use uuid::Uuid;
29
30use crate::{
31 db::{DbPool, DbTransaction},
32 query::{
33 jobs::{cancel_jobs, job_count},
34 schedules::schedule_count,
35 },
36 util::{systemtime_from_ts, systemtime_to_ts},
37};
38
39embed_migrations!("./migrations");
40
41mod db;
42mod models;
43mod query;
44mod util;
45
46#[must_use]
48pub struct PostgresBackend {
49 pool: DbPool,
50 delete_batch_size: usize,
51}
52
53type Result<T> = core::result::Result<T, Error>;
54
55#[derive(Error, Debug)]
57pub enum Error {
58 #[error("{0}")]
60 Migrations(#[from] refinery::Error),
61 #[error("{0}")]
63 Pool(#[from] PoolError),
64 #[error("{0}")]
66 Postgres(#[from] tokio_postgres::Error),
67 #[error("{0}")]
69 Serde(#[from] serde_json::Error),
70 #[error("invalid page token: {0}")]
72 InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
73}
74
75impl PostgresBackend {
76 pub async fn new(pool: Pool) -> Result<Self> {
79 let mut conn = pool.get().await?;
80 conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
81 migrations::runner()
82 .set_migration_table_name("ora.migrations")
83 .run_async(&mut **conn)
84 .await?;
85 Ok(Self {
86 pool: DbPool(pool),
87 delete_batch_size: 40_000,
88 })
89 }
90
91 pub fn with_delete_batch_size(mut self, batch_size: usize) -> Self {
97 assert!(batch_size > 0, "batch size must be greater than zero");
98 self.delete_batch_size = batch_size;
99 self
100 }
101}
102
103impl Backend for PostgresBackend {
104 type Error = Error;
105
106 async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
107 if job_types.is_empty() {
108 return Ok(());
109 }
110
111 let mut conn = self.pool.get().await?;
112
113 let tx = conn.transaction().await?;
114
115 let mut col_id = Vec::with_capacity(job_types.len());
116 let mut col_description = Vec::with_capacity(job_types.len());
117 let mut col_input_schema_json = Vec::with_capacity(job_types.len());
118 let mut col_output_schema_json = Vec::with_capacity(job_types.len());
119
120 for job_type in job_types {
121 col_id.push(job_type.id.as_str());
122 col_description.push(job_type.description.as_deref());
123 col_input_schema_json.push(job_type.input_schema_json.as_deref());
124 col_output_schema_json.push(job_type.output_schema_json.as_deref());
125 }
126
127 {
128 let stmt = tx
129 .prepare(
130 r#"--sql
131 INSERT INTO ora.job_type (
132 id,
133 description,
134 input_schema_json,
135 output_schema_json
136 ) SELECT * FROM UNNEST(
137 $1::TEXT[],
138 $2::TEXT[],
139 $3::TEXT[],
140 $4::TEXT[]
141 ) ON CONFLICT (id) DO UPDATE SET
142 description = EXCLUDED.description,
143 input_schema_json = EXCLUDED.input_schema_json,
144 output_schema_json = EXCLUDED.output_schema_json
145 WHERE
146 ora.job_type.description IS DISTINCT FROM EXCLUDED.description
147 OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
148 OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
149 "#,
150 )
151 .await?;
152
153 tx.execute(
154 &stmt,
155 &[
156 &col_id,
157 &col_description,
158 &col_input_schema_json,
159 &col_output_schema_json,
160 ],
161 )
162 .await?;
163 }
164
165 tx.commit().await?;
166
167 Ok(())
168 }
169
170 async fn list_job_types(&self) -> Result<Vec<JobType>> {
171 let mut conn = self.pool.get().await?;
172
173 let tx = conn.read_only_transaction().await?;
174
175 let job_types = {
176 let stmt = tx
177 .prepare(
178 r#"--sql
179 SELECT
180 id,
181 description,
182 input_schema_json,
183 output_schema_json
184 FROM
185 ora.job_type
186 ORDER BY id
187 "#,
188 )
189 .await?;
190
191 let rows = tx.query(&stmt, &[]).await?;
192
193 rows.into_iter()
194 .map(|row| {
195 Result::<_>::Ok(JobType {
196 id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
197 description: row.try_get(1)?,
198 input_schema_json: row.try_get(2)?,
199 output_schema_json: row.try_get(3)?,
200 })
201 })
202 .collect::<Result<Vec<_>>>()?
203 };
204
205 tx.commit().await?;
206
207 Ok(job_types)
208 }
209
210 async fn add_jobs(
211 &self,
212 jobs: &[NewJob],
213 if_not_exists: Option<JobFilters>,
214 ) -> Result<AddedJobs> {
215 if jobs.is_empty() {
216 return Ok(AddedJobs::Added(Vec::new()));
217 }
218
219 let mut conn = self.pool.get().await?;
220
221 let tx = conn.transaction().await?;
222
223 if let Some(filters) = if_not_exists {
224 let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
225
226 if !existing_job_ids.is_empty() {
227 tx.commit().await?;
228 return Ok(AddedJobs::Existing(existing_job_ids));
229 }
230 }
231
232 let mut col_id = Vec::with_capacity(jobs.len());
233 let mut col_schedule_id = Vec::with_capacity(jobs.len());
234
235 {
236 let mut col_job_type_id = Vec::with_capacity(jobs.len());
237 let mut col_target_execution_time = Vec::with_capacity(jobs.len());
238 let mut col_input_payload_json = Vec::with_capacity(jobs.len());
239 let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
240 let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
241
242 for job in jobs {
243 col_id.push(Uuid::now_v7());
244 col_job_type_id.push(job.job.job_type_id.as_str());
245 col_target_execution_time.push(job.job.target_execution_time);
246 col_input_payload_json.push(job.job.input_payload_json.as_str());
247 col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
248 col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
249 col_schedule_id.push(job.schedule_id.map(|s| s.0));
250 }
251
252 let stmt = tx
253 .prepare(
254 r#"--sql
255 INSERT INTO ora.job (
256 id,
257 job_type_id,
258 target_execution_time,
259 input_payload_json,
260 timeout_policy_json,
261 retry_policy_json,
262 schedule_id
263 ) SELECT * FROM UNNEST(
264 $1::UUID[],
265 $2::TEXT[],
266 $3::TIMESTAMPTZ[],
267 $4::TEXT[],
268 $5::TEXT[],
269 $6::TEXT[],
270 $7::UUID[]
271 )
272 "#,
273 )
274 .await?;
275
276 tx.execute(
277 &stmt,
278 &[
279 &col_id,
280 &col_job_type_id,
281 &col_target_execution_time,
282 &col_input_payload_json,
283 &col_timeout_policy_json,
284 &col_retry_policy_json,
285 &col_schedule_id,
286 ],
287 )
288 .await?;
289 }
290
291 {
292 let mut col_job_id = Vec::with_capacity(jobs.len());
293 let mut col_job_label_key = Vec::with_capacity(jobs.len());
294 let mut col_job_label_value = Vec::with_capacity(jobs.len());
295
296 for (i, job) in jobs.iter().enumerate() {
297 for label in &job.job.labels {
298 col_job_id.push(col_id[i]);
299 col_job_label_key.push(label.key.as_str());
300 col_job_label_value.push(label.value.as_str());
301 }
302 }
303
304 let stmt = tx
305 .prepare(
306 r#"--sql
307 INSERT INTO ora.job_label (
308 job_id,
309 label_key,
310 label_value
311 ) SELECT * FROM UNNEST(
312 $1::UUID[],
313 $2::TEXT[],
314 $3::TEXT[]
315 )
316 "#,
317 )
318 .await?;
319
320 tx.execute(
321 &stmt,
322 &[&col_job_id, &col_job_label_key, &col_job_label_value],
323 )
324 .await?;
325 }
326
327 {
328 let stmt = tx
329 .prepare(
330 r#"--sql
331 UPDATE ora.schedule
332 SET
333 active_job_id = t.job_id
334 FROM UNNEST(
335 $1::UUID[],
336 $2::UUID[]
337 ) AS t(job_id, schedule_id)
338 WHERE
339 ora.schedule.id = t.schedule_id
340 "#,
341 )
342 .await?;
343
344 tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
345 }
346
347 let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
348 add_executions(&tx, &job_ids).await?;
349
350 tx.commit().await?;
351
352 Ok(AddedJobs::Added(job_ids))
353 }
354
355 async fn list_jobs(
356 &self,
357 filters: JobFilters,
358 order_by: Option<JobOrderBy>,
359 page_size: u32,
360 page_token: Option<NextPageToken>,
361 ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
362 let mut conn = self.pool.get().await?;
363 let tx = conn.read_only_transaction().await?;
364
365 let (jobs, next_page_token) = query::jobs::job_details(
366 &tx,
367 filters,
368 order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
369 page_size,
370 page_token.map(|t| t.0),
371 )
372 .await?;
373
374 tx.commit().await?;
375
376 Ok((jobs, next_page_token))
377 }
378
379 async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
380 let mut conn = self.pool.get().await?;
381 let tx = conn.read_only_transaction().await?;
382 let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
383 tx.commit().await?;
384 Ok(count)
385 }
386
387 async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
388 let mut conn = self.pool.get().await?;
389 let tx = conn.transaction().await?;
390 let now = std::time::SystemTime::now();
391
392 let jobs = cancel_jobs(&tx, filters).await?;
393 mark_jobs_inactive(
394 &tx,
395 &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>(),
396 )
397 .await?;
398 tx.commit().await?;
399
400 Ok(jobs)
401 }
402
403 async fn add_schedules(
404 &self,
405 schedules: &[ScheduleDefinition],
406 if_not_exists: Option<ScheduleFilters>,
407 ) -> Result<AddedSchedules> {
408 if schedules.is_empty() {
409 return Ok(AddedSchedules::Added(Vec::new()));
410 }
411
412 let mut conn = self.pool.get().await?;
413 let tx = conn.transaction().await?;
414
415 if let Some(filters) = if_not_exists {
416 let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
417
418 if !existing_schedule_ids.is_empty() {
419 tx.commit().await?;
420 return Ok(AddedSchedules::Existing(existing_schedule_ids));
421 }
422 }
423
424 let mut col_id = Vec::with_capacity(schedules.len());
425 let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
426 let mut col_job_template_json = Vec::with_capacity(schedules.len());
427 let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
428 let mut col_start_after = Vec::with_capacity(schedules.len());
429 let mut col_end_before = Vec::with_capacity(schedules.len());
430
431 for schedule in schedules {
432 let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
433
434 col_id.push(Uuid::now_v7());
435 col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
436 col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
437 col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
438 col_start_after.push(start_after);
439 col_end_before.push(end_before);
440 }
441
442 {
443 let stmt = tx
444 .prepare(
445 r#"--sql
446 INSERT INTO ora.schedule (
447 id,
448 job_template_job_type_id,
449 job_template_json,
450 scheduling_policy_json,
451 start_after,
452 end_before
453 ) SELECT * FROM UNNEST(
454 $1::UUID[],
455 $2::TEXT[],
456 $3::TEXT[],
457 $4::TEXT[],
458 $5::TIMESTAMPTZ[],
459 $6::TIMESTAMPTZ[]
460 )
461 "#,
462 )
463 .await?;
464
465 tx.execute(
466 &stmt,
467 &[
468 &col_id,
469 &col_job_template_job_type_id,
470 &col_job_template_json,
471 &col_scheduling_policy_json,
472 &col_start_after,
473 &col_end_before,
474 ],
475 )
476 .await?;
477 }
478
479 {
480 let mut col_schedule_id = Vec::with_capacity(schedules.len());
481 let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
482 let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
483
484 for (i, schedule) in schedules.iter().enumerate() {
485 for label in &schedule.labels {
486 col_schedule_id.push(col_id[i]);
487 col_schedule_label_key.push(label.key.as_str());
488 col_schedule_label_value.push(label.value.as_str());
489 }
490 }
491
492 let stmt = tx
493 .prepare(
494 r#"--sql
495 INSERT INTO ora.schedule_label (
496 schedule_id,
497 label_key,
498 label_value
499 ) SELECT * FROM UNNEST(
500 $1::UUID[],
501 $2::TEXT[],
502 $3::TEXT[]
503 )
504 "#,
505 )
506 .await?;
507
508 tx.execute(
509 &stmt,
510 &[
511 &col_schedule_id,
512 &col_schedule_label_key,
513 &col_schedule_label_value,
514 ],
515 )
516 .await?;
517 }
518
519 tx.commit().await?;
520
521 let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
522
523 Ok(AddedSchedules::Added(schedule_ids))
524 }
525
526 async fn list_schedules(
527 &self,
528 filters: ScheduleFilters,
529 order_by: Option<ScheduleOrderBy>,
530 page_size: u32,
531 page_token: Option<NextPageToken>,
532 ) -> Result<(
533 Vec<ScheduleDetails>,
534 Option<ora_backend::common::NextPageToken>,
535 )> {
536 let mut conn = self.pool.get().await?;
537 let tx = conn.read_only_transaction().await?;
538
539 let (schedules, next_page_token) = query::schedules::schedule_details(
540 &tx,
541 filters,
542 order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
543 page_size,
544 page_token.map(|t| t.0),
545 )
546 .await?;
547
548 tx.commit().await?;
549
550 Ok((schedules, next_page_token))
551 }
552
553 async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
554 let mut conn = self.pool.get().await?;
555 let tx = conn.read_only_transaction().await?;
556 let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
557 tx.commit().await?;
558 Ok(count)
559 }
560
561 async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
562 let mut conn = self.pool.get().await?;
563 let tx = conn.transaction().await?;
564 let schedules = query::schedules::stop_schedules(&tx, filters).await?;
565 tx.commit().await?;
566 Ok(schedules)
567 }
568
569 fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
570 async_stream::try_stream!({
571 let mut last_execution_id: Option<ExecutionId> = None;
572
573 loop {
574 let mut conn = self.pool.get().await?;
575 let tx = conn.read_only_transaction().await?;
576
577 let rows = if let Some(last_execution_id) = last_execution_id {
578 let stmt = tx
579 .prepare(
580 r#"--sql
581 SELECT
582 ora.execution.id,
583 ora.job.id,
584 ora.job.job_type_id,
585 ora.job.input_payload_json,
586 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
587 ora.job.retry_policy_json,
588 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
589 FROM
590 ora.execution
591 JOIN ora.job ON
592 ora.execution.job_id = ora.job.id
593 WHERE
594 status = 0
595 AND ora.job.target_execution_time <= NOW()
596 AND ora.execution.id > $1::UUID
597 ORDER BY
598 ora.execution.id ASC
599 LIMIT 1000
600 "#,
601 )
602 .await?;
603
604 tx.query(&stmt, &[&last_execution_id.0]).await?
605 } else {
606 let stmt = tx
607 .prepare(
608 r#"--sql
609 SELECT
610 ora.execution.id,
611 ora.job.id,
612 ora.job.job_type_id,
613 ora.job.input_payload_json,
614 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
615 ora.job.retry_policy_json,
616 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
617 FROM
618 ora.execution
619 JOIN ora.job ON
620 ora.execution.job_id = ora.job.id
621 WHERE
622 status = 0
623 AND ora.job.target_execution_time <= NOW()
624 ORDER BY
625 ora.execution.id ASC
626 LIMIT 1000
627 "#,
628 )
629 .await?;
630
631 tx.query(&stmt, &[]).await?
632 };
633
634 tx.commit().await?;
635
636 if rows.is_empty() {
637 break;
638 }
639
640 let mut ready_executions = Vec::with_capacity(rows.len());
641
642 for row in rows {
643 ready_executions.push(ReadyExecution {
644 execution_id: ExecutionId(row.try_get(0)?),
645 job_id: JobId(row.try_get(1)?),
646 job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
647 input_payload_json: row.try_get(3)?,
648 attempt_number: row.try_get::<_, i64>(4)? as u64,
649 retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
650 target_execution_time: systemtime_from_ts(row.try_get::<_, f64>(6)?),
651 });
652 }
653
654 last_execution_id = ready_executions.last().map(|e| e.execution_id);
655
656 yield ready_executions;
657 }
658 })
659 }
660
661 async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
662 let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
663
664 loop {
665 let mut conn = self.pool.get().await?;
666
667 let next_timestamp = {
668 let tx = conn.read_only_transaction().await?;
669
670 let stmt = tx
671 .prepare(
672 r#"--sql
673 SELECT
674 EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
675 FROM
676 ora.job
677 JOIN ora.execution ON
678 ora.execution.job_id = ora.job.id
679 WHERE
680 ora.execution.status = 0
681 AND NOT (ora.execution.id = ANY($1::UUID[]))
682 ORDER BY
683 ora.job.target_execution_time ASC
684 LIMIT 1
685 "#,
686 )
687 .await?;
688
689 let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
690
691 tx.commit().await?;
692
693 match row {
694 Some(row) => row.try_get::<_, Option<f64>>(0)?.map(systemtime_from_ts),
695 None => None,
696 }
697 };
698
699 drop(conn);
700
701 if let Some(next_timestamp) = next_timestamp {
702 let now = std::time::SystemTime::now();
703
704 if next_timestamp <= now {
705 break;
706 }
707
708 tokio::time::sleep(
709 next_timestamp
710 .duration_since(now)
711 .unwrap_or_else(|_| Duration::from_secs(0)),
712 )
713 .await;
714 break;
715 }
716
717 tokio::time::sleep(Duration::from_millis(500)).await;
718 }
719
720 Ok(())
721 }
722
723 fn in_progress_executions(
724 &self,
725 ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
726 async_stream::try_stream!({
727 let mut last_execution_id: Option<ExecutionId> = None;
728
729 loop {
730 let mut conn = self.pool.get().await?;
731 let tx = conn.read_only_transaction().await?;
732
733 let rows = if let Some(last_execution_id) = last_execution_id {
734 let stmt = tx
735 .prepare(
736 r#"--sql
737 SELECT
738 ora.execution.id,
739 ora.job.id,
740 ora.execution.executor_id,
741 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
742 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
743 ora.job.timeout_policy_json,
744 ora.job.retry_policy_json,
745 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
746 FROM
747 ora.execution
748 JOIN ora.job ON
749 ora.execution.job_id = ora.job.id
750 WHERE
751 status = 1
752 AND ora.execution.id > $1::UUID
753 ORDER BY
754 ora.execution.id ASC
755 LIMIT 1000
756 "#,
757 )
758 .await?;
759
760 tx.query(&stmt, &[&last_execution_id.0]).await?
761 } else {
762 let stmt = tx
763 .prepare(
764 r#"--sql
765 SELECT
766 ora.execution.id,
767 ora.job.id,
768 ora.execution.executor_id,
769 EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
770 EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
771 ora.job.timeout_policy_json,
772 ora.job.retry_policy_json,
773 (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
774 FROM
775 ora.execution
776 JOIN ora.job ON
777 ora.execution.job_id = ora.job.id
778 WHERE
779 status = 1
780 ORDER BY
781 ora.execution.id ASC
782 LIMIT 1000
783 "#,
784 )
785 .await?;
786
787 tx.query(&stmt, &[]).await?
788 };
789
790 tx.commit().await?;
791
792 if rows.is_empty() {
793 break;
794 }
795
796 let mut in_progress_executions = Vec::with_capacity(rows.len());
797
798 for row in rows {
799 in_progress_executions.push(InProgressExecution {
800 execution_id: ExecutionId(row.try_get(0)?),
801 job_id: JobId(row.try_get(1)?),
802 executor_id: ExecutorId(row.try_get(2)?),
803 target_execution_time: systemtime_from_ts(row.try_get::<_, f64>(3)?),
804 started_at: systemtime_from_ts(row.try_get::<_, f64>(4)?),
805 timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
806 retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
807 attempt_number: row.try_get::<_, i64>(7)? as u64,
808 });
809 }
810
811 last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
812
813 yield in_progress_executions;
814 }
815 })
816 }
817
818 async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
819 if executions.is_empty() {
820 return Ok(());
821 }
822
823 let mut conn = self.pool.get().await?;
824
825 let tx = conn.transaction().await?;
826
827 let stmt = tx
828 .prepare(
829 r#"--sql
830 UPDATE ora.execution
831 SET
832 executor_id = t.executor_id,
833 started_at = to_timestamp(t.started_at)
834 FROM UNNEST(
835 $1::UUID[],
836 $2::UUID[],
837 $3::DOUBLE PRECISION[]
838 ) AS t(execution_id, executor_id, started_at)
839 WHERE
840 execution_id = id
841 AND ora.execution.started_at IS NULL
842 "#,
843 )
844 .await?;
845
846 let mut col_execution_id = Vec::with_capacity(executions.len());
847 let mut col_executor_id = Vec::with_capacity(executions.len());
848 let mut col_started_at = Vec::with_capacity(executions.len());
849
850 for execution in executions {
851 col_execution_id.push(execution.execution_id.0);
852 col_executor_id.push(execution.executor_id.0);
853 col_started_at.push(systemtime_to_ts(execution.started_at));
854 }
855
856 tx.execute(
857 &stmt,
858 &[&col_execution_id, &col_executor_id, &col_started_at],
859 )
860 .await?;
861
862 tx.commit().await?;
863
864 Ok(())
865 }
866
867 async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
868 if executions.is_empty() {
869 return Ok(());
870 }
871
872 let mut conn = self.pool.get().await?;
873
874 let tx = conn.transaction().await?;
875
876 let stmt = tx
877 .prepare(
878 r#"--sql
879 UPDATE ora.execution
880 SET
881 succeeded_at = to_timestamp(t.succeeded_at),
882 output_json = t.output_json
883 FROM UNNEST(
884 $1::UUID[],
885 $2::DOUBLE PRECISION[],
886 $3::TEXT[]
887 ) AS t(execution_id, succeeded_at, output_json)
888 WHERE
889 execution_id = id
890 AND status < 2
891 RETURNING job_id, t.succeeded_at
892 "#,
893 )
894 .await?;
895
896 let mut col_execution_id = Vec::with_capacity(executions.len());
897 let mut col_succeeded_at = Vec::with_capacity(executions.len());
898 let mut col_output_json = Vec::with_capacity(executions.len());
899
900 for execution in executions {
901 col_execution_id.push(execution.execution_id.0);
902 col_succeeded_at.push(systemtime_to_ts(execution.succeeded_at));
903 col_output_json.push(execution.output_json.as_str());
904 }
905
906 let rows = tx
907 .query(
908 &stmt,
909 &[&col_execution_id, &col_succeeded_at, &col_output_json],
910 )
911 .await?;
912
913 let mut jobs = Vec::with_capacity(rows.len());
914 for row in rows {
915 jobs.push((
916 JobId(row.try_get(0)?),
917 systemtime_from_ts(row.try_get::<_, f64>(1)?),
918 ));
919 }
920
921 mark_jobs_inactive(&tx, &jobs).await?;
922
923 tx.commit().await?;
924
925 Ok(())
926 }
927
928 async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
929 if executions.is_empty() {
930 return Ok(());
931 }
932
933 let mut conn = self.pool.get().await?;
934 let tx = conn.transaction().await?;
935
936 let jobs = executions_failed(&tx, executions).await?;
937
938 mark_jobs_inactive(&tx, &jobs).await?;
939
940 tx.commit().await?;
941
942 Ok(())
943 }
944
945 async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
946 if executions.is_empty() {
947 return Ok(());
948 }
949
950 let mut conn = self.pool.get().await?;
951 let tx = conn.transaction().await?;
952
953 let jobs = executions_failed(&tx, executions).await?;
954 add_executions(
955 &tx,
956 &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
957 )
958 .await?;
959
960 tx.commit().await?;
961
962 Ok(())
963 }
964
965 fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
966 async_stream::try_stream!({
967 let mut last_schedule_id: Option<ScheduleId> = None;
968 loop {
969 let mut conn = self.pool.get().await?;
970 let tx = conn.read_only_transaction().await?;
971
972 let rows = if let Some(last_schedule_id) = last_schedule_id {
973 let stmt = tx
974 .prepare(
975 r#"--sql
976 SELECT
977 id,
978 EXTRACT(EPOCH FROM(
979 SELECT
980 target_execution_time
981 FROM
982 ora.job
983 WHERE
984 ora.job.schedule_id = ora.schedule.id
985 ORDER BY ora.job.id DESC
986 LIMIT 1
987 ))::DOUBLE PRECISION,
988 scheduling_policy_json,
989 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
990 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
991 job_template_json
992 FROM
993 ora.schedule
994 WHERE
995 (
996 ora.schedule.stopped_at IS NULL
997 AND ora.schedule.active_job_id IS NULL
998 )
999 AND id > $1::UUID
1000 ORDER BY id ASC
1001 LIMIT 1000
1002 "#,
1003 )
1004 .await?;
1005
1006 tx.query(&stmt, &[&last_schedule_id.0]).await?
1007 } else {
1008 let stmt = tx
1009 .prepare(
1010 r#"--sql
1011 SELECT
1012 id,
1013 EXTRACT(EPOCH FROM(
1014 SELECT
1015 target_execution_time
1016 FROM
1017 ora.job
1018 WHERE
1019 ora.job.schedule_id = ora.schedule.id
1020 ORDER BY ora.job.id DESC
1021 LIMIT 1
1022 ))::DOUBLE PRECISION,
1023 scheduling_policy_json,
1024 EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1025 EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1026 job_template_json
1027 FROM
1028 ora.schedule
1029 WHERE
1030 ora.schedule.stopped_at IS NULL
1031 AND ora.schedule.active_job_id IS NULL
1032 ORDER BY id ASC
1033 LIMIT 1000
1034 "#,
1035 )
1036 .await?;
1037
1038 tx.query(&stmt, &[]).await?
1039 };
1040
1041 tx.commit().await?;
1042
1043 if rows.is_empty() {
1044 break;
1045 }
1046
1047 let mut schedules = Vec::with_capacity(rows.len());
1048
1049 for row in rows {
1050 schedules.push(PendingSchedule {
1051 schedule_id: ScheduleId(row.try_get(0)?),
1052 last_target_execution_time: row
1053 .try_get::<_, Option<f64>>(1)?
1054 .map(systemtime_from_ts),
1055 scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1056 time_range: TimeRange {
1057 start: row.try_get::<_, Option<f64>>(3)?.map(systemtime_from_ts),
1058 end: row.try_get::<_, Option<f64>>(4)?.map(systemtime_from_ts),
1059 },
1060 job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1061 });
1062 }
1063
1064 last_schedule_id = schedules.last().map(|s| s.schedule_id);
1065 yield schedules;
1066 }
1067 })
1068 }
1069
1070 async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1071 let batch_size = i64::try_from(self.delete_batch_size).unwrap_or(i64::MAX);
1072
1073 let mut conn = self.pool.get().await?;
1074
1075 loop {
1078 let mut deleted_rows = 0;
1079
1080 {
1081 let tx = conn.transaction().await?;
1082
1083 let stmt = tx
1084 .prepare(
1085 r#"--sql
1086 WITH cte AS (
1087 SELECT ctid
1088 FROM ora.job
1089 WHERE inactive_since < to_timestamp($1)
1090 ORDER BY inactive_since
1091 LIMIT $2
1092 )
1093 DELETE FROM ora.job j
1094 USING cte
1095 WHERE j.ctid = cte.ctid;
1096 "#,
1097 )
1098 .await?;
1099
1100 let deleted = tx
1101 .execute(&stmt, &[&systemtime_to_ts(before), &batch_size])
1102 .await?;
1103
1104 deleted_rows += deleted;
1105
1106 tx.commit().await?;
1107 }
1108
1109 {
1110 let tx = conn.transaction().await?;
1111 let stmt = tx
1112 .prepare(
1113 r#"--sql
1114 WITH cte AS (
1115 SELECT ctid
1116 FROM ora.schedule
1117 WHERE stopped_at < to_timestamp($1)
1118 ORDER BY stopped_at
1119 LIMIT $2
1120 )
1121 DELETE FROM ora.schedule s
1122 USING cte
1123 WHERE s.ctid = cte.ctid;
1124 "#,
1125 )
1126 .await?;
1127
1128 let deleted = tx
1129 .execute(&stmt, &[&systemtime_to_ts(before), &batch_size])
1130 .await?;
1131
1132 tx.commit().await?;
1133
1134 deleted_rows += deleted;
1135 }
1136
1137 {
1138 let tx = conn.transaction().await?;
1139
1140 let stmt = tx
1141 .prepare(
1142 r#"--sql
1143 DELETE FROM ora.job_type
1144 WHERE
1145 NOT EXISTS (
1146 SELECT FROM ora.job
1147 WHERE
1148 ora.job.job_type_id = ora.job_type.id
1149 )
1150 AND NOT EXISTS (
1151 SELECT FROM ora.schedule
1152 WHERE
1153 ora.schedule.job_template_job_type_id = ora.job_type.id
1154 )
1155 "#,
1156 )
1157 .await?;
1158
1159 tx.execute(&stmt, &[]).await?;
1160
1161 tx.commit().await?;
1162 }
1163
1164 if deleted_rows == 0 {
1165 break;
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1173 loop {
1174 let mut conn = self.pool.get().await?;
1175
1176 let has_pending = {
1177 let tx = conn.read_only_transaction().await?;
1178
1179 let stmt = tx
1180 .prepare(
1181 r#"--sql
1182 SELECT 1 FROM ora.schedule
1183 WHERE
1184 ora.schedule.stopped_at IS NULL
1185 AND ora.schedule.active_job_id IS NULL
1186 LIMIT 1
1187 "#,
1188 )
1189 .await?;
1190
1191 let row = tx.query_opt(&stmt, &[]).await?;
1192
1193 tx.commit().await?;
1194
1195 row.is_some()
1196 };
1197
1198 drop(conn);
1199
1200 if has_pending {
1201 break;
1202 }
1203
1204 tokio::time::sleep(Duration::from_millis(500)).await;
1205 }
1206
1207 Ok(())
1208 }
1209}
1210
1211async fn executions_failed(
1212 tx: &DbTransaction<'_>,
1213 executions: &[FailedExecution],
1214) -> Result<Vec<(JobId, SystemTime)>> {
1215 let stmt = tx
1216 .prepare(
1217 r#"--sql
1218 UPDATE ora.execution
1219 SET
1220 failed_at = to_timestamp(t.failed_at),
1221 failure_reason = t.failure_reason
1222 FROM UNNEST(
1223 $1::UUID[],
1224 $2::DOUBLE PRECISION[],
1225 $3::TEXT[]
1226 ) AS t(execution_id, failed_at, failure_reason)
1227 WHERE
1228 execution_id = id
1229 AND status < 2
1230 RETURNING job_id, t.failed_at
1231 "#,
1232 )
1233 .await?;
1234
1235 let mut col_execution_id = Vec::with_capacity(executions.len());
1236 let mut col_succeeded_at = Vec::with_capacity(executions.len());
1237 let mut col_failure_reason = Vec::with_capacity(executions.len());
1238
1239 for execution in executions {
1240 col_execution_id.push(execution.execution_id.0);
1241 col_succeeded_at.push(systemtime_to_ts(execution.failed_at));
1242 col_failure_reason.push(execution.failure_reason.as_str());
1243 }
1244
1245 let rows = tx
1246 .query(
1247 &stmt,
1248 &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1249 )
1250 .await?;
1251
1252 let mut jobs = Vec::with_capacity(rows.len());
1253
1254 for row in rows {
1255 jobs.push((
1256 JobId(row.try_get(0)?),
1257 systemtime_from_ts(row.try_get::<_, f64>(1)?),
1258 ));
1259 }
1260
1261 Ok(jobs)
1262}
1263
1264async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1265 if jobs.is_empty() {
1266 return Ok(());
1267 }
1268
1269 let mut col_job_id = Vec::with_capacity(jobs.len());
1270 let mut col_execution_id = Vec::with_capacity(jobs.len());
1271
1272 for job in jobs {
1273 col_job_id.push(job.0);
1274 col_execution_id.push(Uuid::now_v7());
1275 }
1276
1277 let stmt = tx
1278 .prepare(
1279 r#"--sql
1280 INSERT INTO ora.execution (
1281 id,
1282 job_id
1283 ) SELECT * FROM UNNEST(
1284 $1::UUID[],
1285 $2::UUID[]
1286 )
1287 "#,
1288 )
1289 .await?;
1290
1291 tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1292
1293 Ok(())
1294}
1295
1296async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1297 if jobs.is_empty() {
1298 return Ok(());
1299 }
1300
1301 let mut col_job_id = Vec::with_capacity(jobs.len());
1302 let mut col_inactive_since = Vec::with_capacity(jobs.len());
1303
1304 for (job, inactive_since) in jobs {
1305 col_job_id.push(job.0);
1306 col_inactive_since.push(systemtime_to_ts(*inactive_since));
1307 }
1308
1309 {
1310 let stmt = tx
1311 .prepare(
1312 r#"--sql
1313 UPDATE ora.job
1314 SET inactive_since = to_timestamp(t.inactive_since)
1315 FROM
1316 UNNEST(
1317 $1::UUID[],
1318 $2::DOUBLE PRECISION[]
1319 ) AS t(job_id, inactive_since)
1320 WHERE
1321 id = t.job_id;
1322 "#,
1323 )
1324 .await?;
1325
1326 tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1327 .await?;
1328 }
1329
1330 {
1331 let stmt = tx
1332 .prepare(
1333 r#"--sql
1334 UPDATE ora.schedule
1335 SET
1336 active_job_id = NULL
1337 WHERE active_job_id = ANY($1::UUID[])
1338 "#,
1339 )
1340 .await?;
1341
1342 tx.execute(&stmt, &[&col_job_id]).await?;
1343 }
1344
1345 Ok(())
1346}