Skip to main content

ora_backend_postgres/
lib.rs

1//! Postgres backend implementation for Ora.
2#![allow(missing_docs)]
3
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use deadpool_postgres::{Pool, PoolError};
7use futures::Stream;
8use ora_backend::{
9    Backend,
10    common::{NextPageToken, TimeRange},
11    executions::{
12        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
13        SucceededExecution,
14    },
15    executors::ExecutorId,
16    jobs::{
17        AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
18        NewJob,
19    },
20    schedules::{
21        AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
22        ScheduleId, ScheduleOrderBy, StoppedSchedule,
23    },
24};
25
26use refinery::embed_migrations;
27use thiserror::Error;
28use uuid::Uuid;
29
30use crate::{
31    db::{DbPool, DbTransaction},
32    query::{
33        jobs::{cancel_jobs, job_count},
34        schedules::schedule_count,
35    },
36};
37
38embed_migrations!("./migrations");
39
40mod db;
41mod models;
42mod query;
43
44/// Postgres backend for Ora.
45#[must_use]
46pub struct PostgresBackend {
47    pool: DbPool,
48    delete_batch_size: usize,
49}
50
51type Result<T> = core::result::Result<T, Error>;
52
53/// Errors that can occur when using the Postgres backend.
54#[derive(Error, Debug)]
55pub enum Error {
56    /// An error occurred during database migrations.
57    #[error("{0}")]
58    Migrations(#[from] refinery::Error),
59    /// An error occurred while acquiring a database connection from the pool.
60    #[error("{0}")]
61    Pool(#[from] PoolError),
62    /// An error occurred during a database operation.
63    #[error("{0}")]
64    Postgres(#[from] tokio_postgres::Error),
65    /// An error occurred while serializing or deserializing JSON data.
66    #[error("{0}")]
67    Serde(#[from] serde_json::Error),
68    /// An error occurred while converting a value to or from a database type.
69    #[error("invalid page token: {0}")]
70    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
71}
72
73impl PostgresBackend {
74    /// Create and initialize a new Postgres backend
75    /// with the given connection pool.
76    pub async fn new(pool: Pool) -> Result<Self> {
77        let mut conn = pool.get().await?;
78        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
79        migrations::runner()
80            .set_migration_table_name("ora.migrations")
81            .run_async(&mut **conn)
82            .await?;
83        Ok(Self {
84            pool: DbPool(pool),
85            delete_batch_size: 40_000,
86        })
87    }
88
89    /// Set the batch size for all delete operations.
90    ///
91    /// # Panics
92    ///
93    /// Panics if `batch_size` is zero.
94    pub fn with_delete_batch_size(mut self, batch_size: usize) -> Self {
95        assert!(batch_size > 0, "batch size must be greater than zero");
96        self.delete_batch_size = batch_size;
97        self
98    }
99}
100
101impl Backend for PostgresBackend {
102    type Error = Error;
103
104    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
105        if job_types.is_empty() {
106            return Ok(());
107        }
108
109        let mut conn = self.pool.get().await?;
110
111        let tx = conn.transaction().await?;
112
113        let mut col_id = Vec::with_capacity(job_types.len());
114        let mut col_description = Vec::with_capacity(job_types.len());
115        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
116        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
117
118        for job_type in job_types {
119            col_id.push(job_type.id.as_str());
120            col_description.push(job_type.description.as_deref());
121            col_input_schema_json.push(job_type.input_schema_json.as_deref());
122            col_output_schema_json.push(job_type.output_schema_json.as_deref());
123        }
124
125        {
126            let stmt = tx
127                .prepare(
128                    r#"--sql
129                    INSERT INTO ora.job_type (
130                        id,
131                        description,
132                        input_schema_json,
133                        output_schema_json
134                    ) SELECT * FROM UNNEST(
135                        $1::TEXT[],
136                        $2::TEXT[],
137                        $3::TEXT[],
138                        $4::TEXT[]
139                    ) ON CONFLICT (id) DO UPDATE SET
140                        description = EXCLUDED.description,
141                        input_schema_json = EXCLUDED.input_schema_json,
142                        output_schema_json = EXCLUDED.output_schema_json
143                    WHERE
144                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
145                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
146                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
147                    "#,
148                )
149                .await?;
150
151            tx.execute(
152                &stmt,
153                &[
154                    &col_id,
155                    &col_description,
156                    &col_input_schema_json,
157                    &col_output_schema_json,
158                ],
159            )
160            .await?;
161        }
162
163        tx.commit().await?;
164
165        Ok(())
166    }
167
168    async fn list_job_types(&self) -> Result<Vec<JobType>> {
169        let mut conn = self.pool.get().await?;
170
171        let tx = conn.read_only_transaction().await?;
172
173        let job_types = {
174            let stmt = tx
175                .prepare(
176                    r#"--sql
177                    SELECT
178                        id,
179                        description,
180                        input_schema_json,
181                        output_schema_json
182                    FROM
183                        ora.job_type
184                    ORDER BY id
185                    "#,
186                )
187                .await?;
188
189            let rows = tx.query(&stmt, &[]).await?;
190
191            rows.into_iter()
192                .map(|row| {
193                    Result::<_>::Ok(JobType {
194                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
195                        description: row.try_get(1)?,
196                        input_schema_json: row.try_get(2)?,
197                        output_schema_json: row.try_get(3)?,
198                    })
199                })
200                .collect::<Result<Vec<_>>>()?
201        };
202
203        tx.commit().await?;
204
205        Ok(job_types)
206    }
207
208    async fn add_jobs(
209        &self,
210        jobs: &[NewJob],
211        if_not_exists: Option<JobFilters>,
212    ) -> Result<AddedJobs> {
213        if jobs.is_empty() {
214            return Ok(AddedJobs::Added(Vec::new()));
215        }
216
217        let mut conn = self.pool.get().await?;
218
219        let tx = conn.transaction().await?;
220
221        if let Some(filters) = if_not_exists {
222            let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
223
224            if !existing_job_ids.is_empty() {
225                tx.commit().await?;
226                return Ok(AddedJobs::Existing(existing_job_ids));
227            }
228        }
229
230        let mut col_id = Vec::with_capacity(jobs.len());
231        let mut col_schedule_id = Vec::with_capacity(jobs.len());
232
233        {
234            let mut col_job_type_id = Vec::with_capacity(jobs.len());
235            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
236            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
237            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
238            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
239
240            for job in jobs {
241                col_id.push(Uuid::now_v7());
242                col_job_type_id.push(job.job.job_type_id.as_str());
243                col_target_execution_time.push(job.job.target_execution_time);
244                col_input_payload_json.push(job.job.input_payload_json.as_str());
245                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
246                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
247                col_schedule_id.push(job.schedule_id.map(|s| s.0));
248            }
249
250            let stmt = tx
251                .prepare(
252                    r#"--sql
253                    INSERT INTO ora.job (
254                        id,
255                        job_type_id,
256                        target_execution_time,
257                        input_payload_json,
258                        timeout_policy_json,
259                        retry_policy_json,
260                        schedule_id
261                    ) SELECT * FROM UNNEST(
262                        $1::UUID[],
263                        $2::TEXT[],
264                        $3::TIMESTAMPTZ[],
265                        $4::TEXT[],
266                        $5::TEXT[],
267                        $6::TEXT[],
268                        $7::UUID[]
269                    )
270                    "#,
271                )
272                .await?;
273
274            tx.execute(
275                &stmt,
276                &[
277                    &col_id,
278                    &col_job_type_id,
279                    &col_target_execution_time,
280                    &col_input_payload_json,
281                    &col_timeout_policy_json,
282                    &col_retry_policy_json,
283                    &col_schedule_id,
284                ],
285            )
286            .await?;
287        }
288
289        {
290            let mut col_job_id = Vec::with_capacity(jobs.len());
291            let mut col_job_label_key = Vec::with_capacity(jobs.len());
292            let mut col_job_label_value = Vec::with_capacity(jobs.len());
293
294            for (i, job) in jobs.iter().enumerate() {
295                for label in &job.job.labels {
296                    col_job_id.push(col_id[i]);
297                    col_job_label_key.push(label.key.as_str());
298                    col_job_label_value.push(label.value.as_str());
299                }
300            }
301
302            let stmt = tx
303                .prepare(
304                    r#"--sql
305                    INSERT INTO ora.job_label (
306                        job_id,
307                        label_key,
308                        label_value
309                    ) SELECT * FROM UNNEST(
310                        $1::UUID[],
311                        $2::TEXT[],
312                        $3::TEXT[]
313                    )
314                    "#,
315                )
316                .await?;
317
318            tx.execute(
319                &stmt,
320                &[&col_job_id, &col_job_label_key, &col_job_label_value],
321            )
322            .await?;
323        }
324
325        {
326            let stmt = tx
327                .prepare(
328                    r#"--sql
329                    UPDATE ora.schedule
330                    SET
331                        active_job_id = t.job_id
332                    FROM UNNEST(
333                        $1::UUID[],
334                        $2::UUID[]
335                    ) AS t(job_id, schedule_id)
336                    WHERE
337                        ora.schedule.id = t.schedule_id
338                    "#,
339                )
340                .await?;
341
342            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
343        }
344
345        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
346        add_executions(&tx, &job_ids).await?;
347
348        tx.commit().await?;
349
350        Ok(AddedJobs::Added(job_ids))
351    }
352
353    async fn list_jobs(
354        &self,
355        filters: JobFilters,
356        order_by: Option<JobOrderBy>,
357        page_size: u32,
358        page_token: Option<NextPageToken>,
359    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
360        let mut conn = self.pool.get().await?;
361        let tx = conn.read_only_transaction().await?;
362
363        let (jobs, next_page_token) = query::jobs::job_details(
364            &tx,
365            filters,
366            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
367            page_size,
368            page_token.map(|t| t.0),
369        )
370        .await?;
371
372        tx.commit().await?;
373
374        Ok((jobs, next_page_token))
375    }
376
377    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
378        let mut conn = self.pool.get().await?;
379        let tx = conn.read_only_transaction().await?;
380        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
381        tx.commit().await?;
382        Ok(count)
383    }
384
385    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
386        let mut conn = self.pool.get().await?;
387        let tx = conn.transaction().await?;
388        let now = std::time::SystemTime::now();
389
390        let jobs = cancel_jobs(&tx, filters).await?;
391        mark_jobs_inactive(
392            &tx,
393            &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>(),
394        )
395        .await?;
396        tx.commit().await?;
397
398        Ok(jobs)
399    }
400
401    async fn add_schedules(
402        &self,
403        schedules: &[ScheduleDefinition],
404        if_not_exists: Option<ScheduleFilters>,
405    ) -> Result<AddedSchedules> {
406        if schedules.is_empty() {
407            return Ok(AddedSchedules::Added(Vec::new()));
408        }
409
410        let mut conn = self.pool.get().await?;
411        let tx = conn.transaction().await?;
412
413        if let Some(filters) = if_not_exists {
414            let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
415
416            if !existing_schedule_ids.is_empty() {
417                tx.commit().await?;
418                return Ok(AddedSchedules::Existing(existing_schedule_ids));
419            }
420        }
421
422        let mut col_id = Vec::with_capacity(schedules.len());
423        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
424        let mut col_job_template_json = Vec::with_capacity(schedules.len());
425        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
426        let mut col_start_after = Vec::with_capacity(schedules.len());
427        let mut col_end_before = Vec::with_capacity(schedules.len());
428
429        for schedule in schedules {
430            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
431
432            col_id.push(Uuid::now_v7());
433            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
434            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
435            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
436            col_start_after.push(start_after);
437            col_end_before.push(end_before);
438        }
439
440        {
441            let stmt = tx
442                .prepare(
443                    r#"--sql
444                    INSERT INTO ora.schedule (
445                        id,
446                        job_template_job_type_id,
447                        job_template_json,
448                        scheduling_policy_json,
449                        start_after,
450                        end_before
451                    ) SELECT * FROM UNNEST(
452                        $1::UUID[],
453                        $2::TEXT[],
454                        $3::TEXT[],
455                        $4::TEXT[],
456                        $5::TIMESTAMPTZ[],
457                        $6::TIMESTAMPTZ[]
458                    )
459                    "#,
460                )
461                .await?;
462
463            tx.execute(
464                &stmt,
465                &[
466                    &col_id,
467                    &col_job_template_job_type_id,
468                    &col_job_template_json,
469                    &col_scheduling_policy_json,
470                    &col_start_after,
471                    &col_end_before,
472                ],
473            )
474            .await?;
475        }
476
477        {
478            let mut col_schedule_id = Vec::with_capacity(schedules.len());
479            let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
480            let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
481
482            for (i, schedule) in schedules.iter().enumerate() {
483                for label in &schedule.labels {
484                    col_schedule_id.push(col_id[i]);
485                    col_schedule_label_key.push(label.key.as_str());
486                    col_schedule_label_value.push(label.value.as_str());
487                }
488            }
489
490            let stmt = tx
491                .prepare(
492                    r#"--sql
493                    INSERT INTO ora.schedule_label (
494                        schedule_id,
495                        label_key,
496                        label_value
497                    ) SELECT * FROM UNNEST(
498                        $1::UUID[],
499                        $2::TEXT[],
500                        $3::TEXT[]
501                    )
502                    "#,
503                )
504                .await?;
505
506            tx.execute(
507                &stmt,
508                &[
509                    &col_schedule_id,
510                    &col_schedule_label_key,
511                    &col_schedule_label_value,
512                ],
513            )
514            .await?;
515        }
516
517        tx.commit().await?;
518
519        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
520
521        Ok(AddedSchedules::Added(schedule_ids))
522    }
523
524    async fn list_schedules(
525        &self,
526        filters: ScheduleFilters,
527        order_by: Option<ScheduleOrderBy>,
528        page_size: u32,
529        page_token: Option<NextPageToken>,
530    ) -> Result<(
531        Vec<ScheduleDetails>,
532        Option<ora_backend::common::NextPageToken>,
533    )> {
534        let mut conn = self.pool.get().await?;
535        let tx = conn.read_only_transaction().await?;
536
537        let (schedules, next_page_token) = query::schedules::schedule_details(
538            &tx,
539            filters,
540            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
541            page_size,
542            page_token.map(|t| t.0),
543        )
544        .await?;
545
546        tx.commit().await?;
547
548        Ok((schedules, next_page_token))
549    }
550
551    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
552        let mut conn = self.pool.get().await?;
553        let tx = conn.read_only_transaction().await?;
554        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
555        tx.commit().await?;
556        Ok(count)
557    }
558
559    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
560        let mut conn = self.pool.get().await?;
561        let tx = conn.transaction().await?;
562        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
563        tx.commit().await?;
564        Ok(schedules)
565    }
566
567    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
568        async_stream::try_stream!({
569            let mut last_execution_id: Option<ExecutionId> = None;
570
571            loop {
572                let mut conn = self.pool.get().await?;
573                let tx = conn.read_only_transaction().await?;
574
575                let rows = if let Some(last_execution_id) = last_execution_id {
576                    let stmt = tx
577                        .prepare(
578                            r#"--sql
579                            SELECT
580                                ora.execution.id,
581                                ora.job.id,
582                                ora.job.job_type_id,
583                                ora.job.input_payload_json,
584                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
585                                ora.job.retry_policy_json,
586                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
587                            FROM
588                                ora.execution
589                            JOIN ora.job ON
590                                ora.execution.job_id = ora.job.id
591                            WHERE
592                                status = 0
593                                AND ora.job.target_execution_time <= NOW()
594                                AND ora.execution.id > $1::UUID
595                            ORDER BY
596                                ora.execution.id ASC
597                            LIMIT 1000
598                            "#,
599                        )
600                        .await?;
601
602                    tx.query(&stmt, &[&last_execution_id.0]).await?
603                } else {
604                    let stmt = tx
605                        .prepare(
606                            r#"--sql
607                            SELECT
608                                ora.execution.id,
609                                ora.job.id,
610                                ora.job.job_type_id,
611                                ora.job.input_payload_json,
612                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
613                                ora.job.retry_policy_json,
614                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
615                            FROM
616                                ora.execution
617                            JOIN ora.job ON
618                                ora.execution.job_id = ora.job.id
619                            WHERE
620                                status = 0
621                                AND ora.job.target_execution_time <= NOW()
622                            ORDER BY
623                                ora.execution.id ASC
624                            LIMIT 1000
625                            "#,
626                        )
627                        .await?;
628
629                    tx.query(&stmt, &[]).await?
630                };
631
632                tx.commit().await?;
633
634                if rows.is_empty() {
635                    break;
636                }
637
638                let mut ready_executions = Vec::with_capacity(rows.len());
639
640                for row in rows {
641                    ready_executions.push(ReadyExecution {
642                        execution_id: ExecutionId(row.try_get(0)?),
643                        job_id: JobId(row.try_get(1)?),
644                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
645                        input_payload_json: row.try_get(3)?,
646                        attempt_number: row.try_get::<_, i64>(4)? as u64,
647                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
648                        target_execution_time: UNIX_EPOCH
649                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
650                    });
651                }
652
653                last_execution_id = ready_executions.last().map(|e| e.execution_id);
654
655                yield ready_executions;
656            }
657        })
658    }
659
660    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
661        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
662
663        loop {
664            let mut conn = self.pool.get().await?;
665
666            let next_timestamp = {
667                let tx = conn.read_only_transaction().await?;
668
669                let stmt = tx
670                    .prepare(
671                        r#"--sql
672                        SELECT
673                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
674                        FROM
675                            ora.job
676                        JOIN ora.execution ON
677                            ora.execution.job_id = ora.job.id
678                        WHERE
679                            ora.execution.status = 0
680                            AND NOT (ora.execution.id = ANY($1::UUID[]))
681                        ORDER BY
682                            ora.job.target_execution_time ASC
683                        LIMIT 1
684                        "#,
685                    )
686                    .await?;
687
688                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
689
690                tx.commit().await?;
691
692                match row {
693                    Some(row) => row
694                        .try_get::<_, Option<f64>>(0)?
695                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
696                    None => None,
697                }
698            };
699
700            drop(conn);
701
702            if let Some(next_timestamp) = next_timestamp {
703                let now = std::time::SystemTime::now();
704
705                if next_timestamp <= now {
706                    break;
707                }
708
709                tokio::time::sleep(
710                    next_timestamp
711                        .duration_since(now)
712                        .unwrap_or_else(|_| Duration::from_secs(0)),
713                )
714                .await;
715                break;
716            }
717
718            tokio::time::sleep(Duration::from_millis(500)).await;
719        }
720
721        Ok(())
722    }
723
724    fn in_progress_executions(
725        &self,
726    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
727        async_stream::try_stream!({
728            let mut last_execution_id: Option<ExecutionId> = None;
729
730            loop {
731                let mut conn = self.pool.get().await?;
732                let tx = conn.read_only_transaction().await?;
733
734                let rows = if let Some(last_execution_id) = last_execution_id {
735                    let stmt = tx
736                        .prepare(
737                            r#"--sql
738                            SELECT
739                                ora.execution.id,
740                                ora.job.id,
741                                ora.execution.executor_id,
742                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
743                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
744                                ora.job.timeout_policy_json,
745                                ora.job.retry_policy_json,
746                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
747                            FROM
748                                ora.execution
749                            JOIN ora.job ON
750                                ora.execution.job_id = ora.job.id
751                            WHERE
752                                status = 1
753                                AND ora.execution.id > $1::UUID
754                            ORDER BY
755                                ora.execution.id ASC
756                            LIMIT 1000
757                            "#,
758                        )
759                        .await?;
760
761                    tx.query(&stmt, &[&last_execution_id.0]).await?
762                } else {
763                    let stmt = tx
764                        .prepare(
765                            r#"--sql
766                            SELECT
767                                ora.execution.id,
768                                ora.job.id,
769                                ora.execution.executor_id,
770                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
771                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
772                                ora.job.timeout_policy_json,
773                                ora.job.retry_policy_json,
774                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
775                            FROM
776                                ora.execution
777                            JOIN ora.job ON
778                                ora.execution.job_id = ora.job.id
779                            WHERE
780                                status = 1
781                            ORDER BY
782                                ora.execution.id ASC
783                            LIMIT 1000
784                        "#,
785                        )
786                        .await?;
787
788                    tx.query(&stmt, &[]).await?
789                };
790
791                tx.commit().await?;
792
793                if rows.is_empty() {
794                    break;
795                }
796
797                let mut in_progress_executions = Vec::with_capacity(rows.len());
798
799                for row in rows {
800                    in_progress_executions.push(InProgressExecution {
801                        execution_id: ExecutionId(row.try_get(0)?),
802                        job_id: JobId(row.try_get(1)?),
803                        executor_id: ExecutorId(row.try_get(2)?),
804                        target_execution_time: UNIX_EPOCH
805                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
806                        started_at: UNIX_EPOCH
807                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
808                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
809                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
810                        attempt_number: row.try_get::<_, i64>(7)? as u64,
811                    });
812                }
813
814                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
815
816                yield in_progress_executions;
817            }
818        })
819    }
820
821    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
822        if executions.is_empty() {
823            return Ok(());
824        }
825
826        let mut conn = self.pool.get().await?;
827
828        let tx = conn.transaction().await?;
829
830        let stmt = tx
831            .prepare(
832                r#"--sql
833                UPDATE ora.execution
834                SET
835                    executor_id = t.executor_id,
836                    started_at = to_timestamp(t.started_at)
837                FROM UNNEST(
838                    $1::UUID[],
839                    $2::UUID[],
840                    $3::DOUBLE PRECISION[]
841                ) AS t(execution_id, executor_id, started_at)
842                WHERE
843                    execution_id = id
844                    AND ora.execution.started_at IS NULL
845                "#,
846            )
847            .await?;
848
849        let mut col_execution_id = Vec::with_capacity(executions.len());
850        let mut col_executor_id = Vec::with_capacity(executions.len());
851        let mut col_started_at = Vec::with_capacity(executions.len());
852
853        for execution in executions {
854            col_execution_id.push(execution.execution_id.0);
855            col_executor_id.push(execution.executor_id.0);
856            col_started_at.push(
857                execution
858                    .started_at
859                    .duration_since(UNIX_EPOCH)
860                    .unwrap_or_default()
861                    .as_secs_f64(),
862            );
863        }
864
865        tx.execute(
866            &stmt,
867            &[&col_execution_id, &col_executor_id, &col_started_at],
868        )
869        .await?;
870
871        tx.commit().await?;
872
873        Ok(())
874    }
875
876    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
877        if executions.is_empty() {
878            return Ok(());
879        }
880
881        let mut conn = self.pool.get().await?;
882
883        let tx = conn.transaction().await?;
884
885        let stmt = tx
886            .prepare(
887                r#"--sql
888                UPDATE ora.execution
889                SET
890                    succeeded_at = to_timestamp(t.succeeded_at),
891                    output_json = t.output_json
892                FROM UNNEST(
893                    $1::UUID[],
894                    $2::DOUBLE PRECISION[],
895                    $3::TEXT[]
896                ) AS t(execution_id, succeeded_at, output_json)
897                WHERE
898                    execution_id = id
899                    AND status < 2
900                RETURNING job_id, t.succeeded_at
901                "#,
902            )
903            .await?;
904
905        let mut col_execution_id = Vec::with_capacity(executions.len());
906        let mut col_succeeded_at = Vec::with_capacity(executions.len());
907        let mut col_output_json = Vec::with_capacity(executions.len());
908
909        for execution in executions {
910            col_execution_id.push(execution.execution_id.0);
911            col_succeeded_at.push(
912                execution
913                    .succeeded_at
914                    .duration_since(UNIX_EPOCH)
915                    .unwrap_or_default()
916                    .as_secs_f64(),
917            );
918            col_output_json.push(execution.output_json.as_str());
919        }
920
921        let rows = tx
922            .query(
923                &stmt,
924                &[&col_execution_id, &col_succeeded_at, &col_output_json],
925            )
926            .await?;
927
928        let mut jobs = Vec::with_capacity(rows.len());
929        for row in rows {
930            jobs.push((
931                JobId(row.try_get(0)?),
932                UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
933            ));
934        }
935
936        mark_jobs_inactive(&tx, &jobs).await?;
937
938        tx.commit().await?;
939
940        Ok(())
941    }
942
943    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
944        if executions.is_empty() {
945            return Ok(());
946        }
947
948        let mut conn = self.pool.get().await?;
949        let tx = conn.transaction().await?;
950
951        let jobs = executions_failed(&tx, executions).await?;
952
953        mark_jobs_inactive(&tx, &jobs).await?;
954
955        tx.commit().await?;
956
957        Ok(())
958    }
959
960    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
961        if executions.is_empty() {
962            return Ok(());
963        }
964
965        let mut conn = self.pool.get().await?;
966        let tx = conn.transaction().await?;
967
968        let jobs = executions_failed(&tx, executions).await?;
969        add_executions(
970            &tx,
971            &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
972        )
973        .await?;
974
975        tx.commit().await?;
976
977        Ok(())
978    }
979
980    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
981        async_stream::try_stream!({
982            let mut last_schedule_id: Option<ScheduleId> = None;
983            loop {
984                let mut conn = self.pool.get().await?;
985                let tx = conn.read_only_transaction().await?;
986
987                let rows = if let Some(last_schedule_id) = last_schedule_id {
988                    let stmt = tx
989                        .prepare(
990                            r#"--sql
991                            SELECT
992                                id,
993                                EXTRACT(EPOCH FROM(
994                                    SELECT
995                                        target_execution_time
996                                    FROM
997                                        ora.job
998                                    WHERE
999                                        ora.job.schedule_id = ora.schedule.id
1000                                    ORDER BY ora.job.id DESC
1001                                    LIMIT 1
1002                                ))::DOUBLE PRECISION,
1003                                scheduling_policy_json,
1004                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1005                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1006                                job_template_json
1007                            FROM
1008                                ora.schedule
1009                            WHERE
1010                                (
1011                                    ora.schedule.stopped_at IS NULL
1012                                    AND ora.schedule.active_job_id IS NULL
1013                                )
1014                                AND id > $1::UUID
1015                            ORDER BY id ASC
1016                            LIMIT 1000
1017                            "#,
1018                        )
1019                        .await?;
1020
1021                    tx.query(&stmt, &[&last_schedule_id.0]).await?
1022                } else {
1023                    let stmt = tx
1024                        .prepare(
1025                            r#"--sql
1026                            SELECT
1027                                id,
1028                                EXTRACT(EPOCH FROM(
1029                                    SELECT
1030                                        target_execution_time
1031                                    FROM
1032                                        ora.job
1033                                    WHERE
1034                                        ora.job.schedule_id = ora.schedule.id
1035                                    ORDER BY ora.job.id DESC
1036                                    LIMIT 1
1037                                ))::DOUBLE PRECISION,
1038                                scheduling_policy_json,
1039                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1040                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1041                                job_template_json
1042                            FROM
1043                                ora.schedule
1044                            WHERE
1045                                ora.schedule.stopped_at IS NULL
1046                                AND ora.schedule.active_job_id IS NULL
1047                            ORDER BY id ASC
1048                            LIMIT 1000
1049                            "#,
1050                        )
1051                        .await?;
1052
1053                    tx.query(&stmt, &[]).await?
1054                };
1055
1056                tx.commit().await?;
1057
1058                if rows.is_empty() {
1059                    break;
1060                }
1061
1062                let mut schedules = Vec::with_capacity(rows.len());
1063
1064                for row in rows {
1065                    schedules.push(PendingSchedule {
1066                        schedule_id: ScheduleId(row.try_get(0)?),
1067                        last_target_execution_time: row
1068                            .try_get::<_, Option<f64>>(1)?
1069                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1070                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1071                        time_range: TimeRange {
1072                            start: row
1073                                .try_get::<_, Option<f64>>(3)?
1074                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1075                            end: row
1076                                .try_get::<_, Option<f64>>(4)?
1077                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1078                        },
1079                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1080                    });
1081                }
1082
1083                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1084                yield schedules;
1085            }
1086        })
1087    }
1088
1089    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1090        let batch_size = i64::try_from(self.delete_batch_size).unwrap_or(i64::MAX);
1091
1092        let mut conn = self.pool.get().await?;
1093
1094        // delete in a loop so that we don't lock
1095        // tables for too long
1096        loop {
1097            let mut deleted_rows = 0;
1098
1099            {
1100                let tx = conn.transaction().await?;
1101
1102                let stmt = tx
1103                    .prepare(
1104                        r#"--sql
1105                        WITH cte AS (
1106                            SELECT ctid
1107                            FROM ora.job
1108                            WHERE inactive_since < to_timestamp($1)
1109                            ORDER BY inactive_since
1110                            LIMIT $2
1111                        )
1112                        DELETE FROM ora.job j
1113                        USING cte
1114                        WHERE j.ctid = cte.ctid;
1115                        "#,
1116                    )
1117                    .await?;
1118
1119                let deleted = tx
1120                    .execute(
1121                        &stmt,
1122                        &[
1123                            &before
1124                                .duration_since(UNIX_EPOCH)
1125                                .unwrap_or_default()
1126                                .as_secs_f64(),
1127                            &batch_size,
1128                        ],
1129                    )
1130                    .await?;
1131
1132                deleted_rows += deleted;
1133
1134                tx.commit().await?;
1135            }
1136
1137            {
1138                let tx = conn.transaction().await?;
1139                let stmt = tx
1140                    .prepare(
1141                        r#"--sql
1142                        WITH cte AS (
1143                            SELECT ctid
1144                            FROM ora.schedule
1145                            WHERE stopped_at < to_timestamp($1)
1146                            ORDER BY stopped_at
1147                            LIMIT $2
1148                        )
1149                        DELETE FROM ora.schedule s
1150                        USING cte
1151                        WHERE s.ctid = cte.ctid;
1152                        "#,
1153                    )
1154                    .await?;
1155
1156                let deleted = tx
1157                    .execute(
1158                        &stmt,
1159                        &[
1160                            &before
1161                                .duration_since(UNIX_EPOCH)
1162                                .unwrap_or_default()
1163                                .as_secs_f64(),
1164                            &batch_size,
1165                        ],
1166                    )
1167                    .await?;
1168
1169                tx.commit().await?;
1170
1171                deleted_rows += deleted;
1172            }
1173
1174            {
1175                let tx = conn.transaction().await?;
1176
1177                let stmt = tx
1178                    .prepare(
1179                        r#"--sql
1180                        DELETE FROM ora.job_type
1181                        WHERE
1182                            NOT EXISTS (
1183                                SELECT FROM ora.job
1184                                WHERE
1185                                    ora.job.job_type_id = ora.job_type.id
1186                            )
1187                            AND NOT EXISTS (
1188                                SELECT FROM ora.schedule
1189                                WHERE
1190                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1191                            )
1192                        "#,
1193                    )
1194                    .await?;
1195
1196                tx.execute(&stmt, &[]).await?;
1197
1198                tx.commit().await?;
1199            }
1200
1201            if deleted_rows == 0 {
1202                break;
1203            }
1204        }
1205
1206        Ok(())
1207    }
1208
1209    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1210        loop {
1211            let mut conn = self.pool.get().await?;
1212
1213            let has_pending = {
1214                let tx = conn.read_only_transaction().await?;
1215
1216                let stmt = tx
1217                    .prepare(
1218                        r#"--sql
1219                        SELECT 1 FROM ora.schedule
1220                        WHERE
1221                            ora.schedule.stopped_at IS NULL
1222                            AND ora.schedule.active_job_id IS NULL
1223                        LIMIT 1
1224                        "#,
1225                    )
1226                    .await?;
1227
1228                let row = tx.query_opt(&stmt, &[]).await?;
1229
1230                tx.commit().await?;
1231
1232                row.is_some()
1233            };
1234
1235            drop(conn);
1236
1237            if has_pending {
1238                break;
1239            }
1240
1241            tokio::time::sleep(Duration::from_millis(500)).await;
1242        }
1243
1244        Ok(())
1245    }
1246}
1247
1248async fn executions_failed(
1249    tx: &DbTransaction<'_>,
1250    executions: &[FailedExecution],
1251) -> Result<Vec<(JobId, SystemTime)>> {
1252    let stmt = tx
1253        .prepare(
1254            r#"--sql
1255            UPDATE ora.execution
1256            SET
1257                failed_at = to_timestamp(t.failed_at),
1258                failure_reason = t.failure_reason
1259            FROM UNNEST(
1260                $1::UUID[],
1261                $2::DOUBLE PRECISION[],
1262                $3::TEXT[]
1263            ) AS t(execution_id, failed_at, failure_reason)
1264            WHERE
1265                execution_id = id
1266                AND status < 2
1267            RETURNING job_id, t.failed_at
1268            "#,
1269        )
1270        .await?;
1271
1272    let mut col_execution_id = Vec::with_capacity(executions.len());
1273    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1274    let mut col_failure_reason = Vec::with_capacity(executions.len());
1275
1276    for execution in executions {
1277        col_execution_id.push(execution.execution_id.0);
1278        col_succeeded_at.push(
1279            execution
1280                .failed_at
1281                .duration_since(UNIX_EPOCH)
1282                .unwrap_or_default()
1283                .as_secs_f64(),
1284        );
1285        col_failure_reason.push(execution.failure_reason.as_str());
1286    }
1287
1288    let rows = tx
1289        .query(
1290            &stmt,
1291            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1292        )
1293        .await?;
1294
1295    let mut jobs = Vec::with_capacity(rows.len());
1296
1297    for row in rows {
1298        jobs.push((
1299            JobId(row.try_get(0)?),
1300            UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
1301        ));
1302    }
1303
1304    Ok(jobs)
1305}
1306
1307async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1308    if jobs.is_empty() {
1309        return Ok(());
1310    }
1311
1312    let mut col_job_id = Vec::with_capacity(jobs.len());
1313    let mut col_execution_id = Vec::with_capacity(jobs.len());
1314
1315    for job in jobs {
1316        col_job_id.push(job.0);
1317        col_execution_id.push(Uuid::now_v7());
1318    }
1319
1320    let stmt = tx
1321        .prepare(
1322            r#"--sql
1323            INSERT INTO ora.execution (
1324                id,
1325                job_id
1326            ) SELECT * FROM UNNEST(
1327                $1::UUID[],
1328                $2::UUID[]
1329            )
1330            "#,
1331        )
1332        .await?;
1333
1334    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1335
1336    Ok(())
1337}
1338
1339async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1340    if jobs.is_empty() {
1341        return Ok(());
1342    }
1343
1344    let mut col_job_id = Vec::with_capacity(jobs.len());
1345    let mut col_inactive_since = Vec::with_capacity(jobs.len());
1346
1347    for (job, inactive_since) in jobs {
1348        col_job_id.push(job.0);
1349        col_inactive_since.push(
1350            inactive_since
1351                .duration_since(UNIX_EPOCH)
1352                .unwrap_or_default()
1353                .as_secs_f64(),
1354        );
1355    }
1356
1357    {
1358        let stmt = tx
1359            .prepare(
1360                r#"--sql
1361                UPDATE ora.job
1362                SET inactive_since = to_timestamp(t.inactive_since)
1363                FROM
1364                    UNNEST(
1365                        $1::UUID[],
1366                        $2::DOUBLE PRECISION[]
1367                    ) AS t(job_id, inactive_since)
1368                WHERE
1369                    id = t.job_id;
1370                "#,
1371            )
1372            .await?;
1373
1374        tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1375            .await?;
1376    }
1377
1378    {
1379        let stmt = tx
1380            .prepare(
1381                r#"--sql
1382                UPDATE ora.schedule
1383                SET
1384                    active_job_id = NULL
1385                WHERE active_job_id = ANY($1::UUID[])
1386                "#,
1387            )
1388            .await?;
1389
1390        tx.execute(&stmt, &[&col_job_id]).await?;
1391    }
1392
1393    Ok(())
1394}