Skip to main content

ora_backend_postgres/
lib.rs

1//! Postgres backend implementation for Ora.
2#![allow(missing_docs)]
3
4use std::time::{Duration, SystemTime};
5
6use deadpool_postgres::{Pool, PoolError};
7use futures::Stream;
8use ora_backend::{
9    Backend,
10    common::{NextPageToken, TimeRange},
11    executions::{
12        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
13        SucceededExecution,
14    },
15    executors::ExecutorId,
16    jobs::{
17        AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
18        NewJob,
19    },
20    schedules::{
21        AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
22        ScheduleId, ScheduleOrderBy, StoppedSchedule,
23    },
24};
25
26use refinery::embed_migrations;
27use thiserror::Error;
28use uuid::Uuid;
29
30use crate::{
31    db::{DbPool, DbTransaction},
32    query::{
33        jobs::{cancel_jobs, job_count},
34        schedules::schedule_count,
35    },
36    util::{systemtime_from_ts, systemtime_to_ts},
37};
38
39embed_migrations!("./migrations");
40
41mod db;
42mod models;
43mod query;
44mod util;
45
46/// Postgres backend for Ora.
47#[must_use]
48pub struct PostgresBackend {
49    pool: DbPool,
50    delete_batch_size: usize,
51}
52
53type Result<T> = core::result::Result<T, Error>;
54
55/// Errors that can occur when using the Postgres backend.
56#[derive(Error, Debug)]
57pub enum Error {
58    /// An error occurred during database migrations.
59    #[error("{0}")]
60    Migrations(#[from] refinery::Error),
61    /// An error occurred while acquiring a database connection from the pool.
62    #[error("{0}")]
63    Pool(#[from] PoolError),
64    /// An error occurred during a database operation.
65    #[error("{0}")]
66    Postgres(#[from] tokio_postgres::Error),
67    /// An error occurred while serializing or deserializing JSON data.
68    #[error("{0}")]
69    Serde(#[from] serde_json::Error),
70    /// An error occurred while converting a value to or from a database type.
71    #[error("invalid page token: {0}")]
72    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
73}
74
75impl PostgresBackend {
76    /// Create and initialize a new Postgres backend
77    /// with the given connection pool.
78    pub async fn new(pool: Pool) -> Result<Self> {
79        let mut conn = pool.get().await?;
80        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
81        migrations::runner()
82            .set_migration_table_name("ora.migrations")
83            .run_async(&mut **conn)
84            .await?;
85        Ok(Self {
86            pool: DbPool(pool),
87            delete_batch_size: 40_000,
88        })
89    }
90
91    /// Set the batch size for all delete operations.
92    ///
93    /// # Panics
94    ///
95    /// Panics if `batch_size` is zero.
96    pub fn with_delete_batch_size(mut self, batch_size: usize) -> Self {
97        assert!(batch_size > 0, "batch size must be greater than zero");
98        self.delete_batch_size = batch_size;
99        self
100    }
101}
102
103impl Backend for PostgresBackend {
104    type Error = Error;
105
106    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
107        if job_types.is_empty() {
108            return Ok(());
109        }
110
111        let mut conn = self.pool.get().await?;
112
113        let tx = conn.transaction().await?;
114
115        let mut col_id = Vec::with_capacity(job_types.len());
116        let mut col_description = Vec::with_capacity(job_types.len());
117        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
118        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
119
120        for job_type in job_types {
121            col_id.push(job_type.id.as_str());
122            col_description.push(job_type.description.as_deref());
123            col_input_schema_json.push(job_type.input_schema_json.as_deref());
124            col_output_schema_json.push(job_type.output_schema_json.as_deref());
125        }
126
127        {
128            let stmt = tx
129                .prepare(
130                    r#"--sql
131                    INSERT INTO ora.job_type (
132                        id,
133                        description,
134                        input_schema_json,
135                        output_schema_json
136                    ) SELECT * FROM UNNEST(
137                        $1::TEXT[],
138                        $2::TEXT[],
139                        $3::TEXT[],
140                        $4::TEXT[]
141                    ) ON CONFLICT (id) DO UPDATE SET
142                        description = EXCLUDED.description,
143                        input_schema_json = EXCLUDED.input_schema_json,
144                        output_schema_json = EXCLUDED.output_schema_json
145                    WHERE
146                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
147                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
148                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
149                    "#,
150                )
151                .await?;
152
153            tx.execute(
154                &stmt,
155                &[
156                    &col_id,
157                    &col_description,
158                    &col_input_schema_json,
159                    &col_output_schema_json,
160                ],
161            )
162            .await?;
163        }
164
165        tx.commit().await?;
166
167        Ok(())
168    }
169
170    async fn list_job_types(&self) -> Result<Vec<JobType>> {
171        let mut conn = self.pool.get().await?;
172
173        let tx = conn.read_only_transaction().await?;
174
175        let job_types = {
176            let stmt = tx
177                .prepare(
178                    r#"--sql
179                    SELECT
180                        id,
181                        description,
182                        input_schema_json,
183                        output_schema_json
184                    FROM
185                        ora.job_type
186                    ORDER BY id
187                    "#,
188                )
189                .await?;
190
191            let rows = tx.query(&stmt, &[]).await?;
192
193            rows.into_iter()
194                .map(|row| {
195                    Result::<_>::Ok(JobType {
196                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
197                        description: row.try_get(1)?,
198                        input_schema_json: row.try_get(2)?,
199                        output_schema_json: row.try_get(3)?,
200                    })
201                })
202                .collect::<Result<Vec<_>>>()?
203        };
204
205        tx.commit().await?;
206
207        Ok(job_types)
208    }
209
210    async fn add_jobs(
211        &self,
212        jobs: &[NewJob],
213        if_not_exists: Option<JobFilters>,
214    ) -> Result<AddedJobs> {
215        if jobs.is_empty() {
216            return Ok(AddedJobs::Added(Vec::new()));
217        }
218
219        let mut conn = self.pool.get().await?;
220
221        let tx = conn.transaction().await?;
222
223        if let Some(filters) = if_not_exists {
224            let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
225
226            if !existing_job_ids.is_empty() {
227                tx.commit().await?;
228                return Ok(AddedJobs::Existing(existing_job_ids));
229            }
230        }
231
232        let mut col_id = Vec::with_capacity(jobs.len());
233        let mut col_schedule_id = Vec::with_capacity(jobs.len());
234
235        {
236            let mut col_job_type_id = Vec::with_capacity(jobs.len());
237            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
238            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
239            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
240            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
241
242            for job in jobs {
243                col_id.push(Uuid::now_v7());
244                col_job_type_id.push(job.job.job_type_id.as_str());
245                col_target_execution_time.push(job.job.target_execution_time);
246                col_input_payload_json.push(job.job.input_payload_json.as_str());
247                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
248                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
249                col_schedule_id.push(job.schedule_id.map(|s| s.0));
250            }
251
252            let stmt = tx
253                .prepare(
254                    r#"--sql
255                    INSERT INTO ora.job (
256                        id,
257                        job_type_id,
258                        target_execution_time,
259                        input_payload_json,
260                        timeout_policy_json,
261                        retry_policy_json,
262                        schedule_id
263                    ) SELECT * FROM UNNEST(
264                        $1::UUID[],
265                        $2::TEXT[],
266                        $3::TIMESTAMPTZ[],
267                        $4::TEXT[],
268                        $5::TEXT[],
269                        $6::TEXT[],
270                        $7::UUID[]
271                    )
272                    "#,
273                )
274                .await?;
275
276            tx.execute(
277                &stmt,
278                &[
279                    &col_id,
280                    &col_job_type_id,
281                    &col_target_execution_time,
282                    &col_input_payload_json,
283                    &col_timeout_policy_json,
284                    &col_retry_policy_json,
285                    &col_schedule_id,
286                ],
287            )
288            .await?;
289        }
290
291        {
292            let mut col_job_id = Vec::with_capacity(jobs.len());
293            let mut col_job_label_key = Vec::with_capacity(jobs.len());
294            let mut col_job_label_value = Vec::with_capacity(jobs.len());
295
296            for (i, job) in jobs.iter().enumerate() {
297                for label in &job.job.labels {
298                    col_job_id.push(col_id[i]);
299                    col_job_label_key.push(label.key.as_str());
300                    col_job_label_value.push(label.value.as_str());
301                }
302            }
303
304            let stmt = tx
305                .prepare(
306                    r#"--sql
307                    INSERT INTO ora.job_label (
308                        job_id,
309                        label_key,
310                        label_value
311                    ) SELECT * FROM UNNEST(
312                        $1::UUID[],
313                        $2::TEXT[],
314                        $3::TEXT[]
315                    )
316                    "#,
317                )
318                .await?;
319
320            tx.execute(
321                &stmt,
322                &[&col_job_id, &col_job_label_key, &col_job_label_value],
323            )
324            .await?;
325        }
326
327        {
328            let stmt = tx
329                .prepare(
330                    r#"--sql
331                    UPDATE ora.schedule
332                    SET
333                        active_job_id = t.job_id
334                    FROM UNNEST(
335                        $1::UUID[],
336                        $2::UUID[]
337                    ) AS t(job_id, schedule_id)
338                    WHERE
339                        ora.schedule.id = t.schedule_id
340                    "#,
341                )
342                .await?;
343
344            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
345        }
346
347        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
348        add_executions(&tx, &job_ids).await?;
349
350        tx.commit().await?;
351
352        Ok(AddedJobs::Added(job_ids))
353    }
354
355    async fn list_jobs(
356        &self,
357        filters: JobFilters,
358        order_by: Option<JobOrderBy>,
359        page_size: u32,
360        page_token: Option<NextPageToken>,
361    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
362        let mut conn = self.pool.get().await?;
363        let tx = conn.read_only_transaction().await?;
364
365        let (jobs, next_page_token) = query::jobs::job_details(
366            &tx,
367            filters,
368            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
369            page_size,
370            page_token.map(|t| t.0),
371        )
372        .await?;
373
374        tx.commit().await?;
375
376        Ok((jobs, next_page_token))
377    }
378
379    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
380        let mut conn = self.pool.get().await?;
381        let tx = conn.read_only_transaction().await?;
382        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
383        tx.commit().await?;
384        Ok(count)
385    }
386
387    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
388        let mut conn = self.pool.get().await?;
389        let tx = conn.transaction().await?;
390        let now = std::time::SystemTime::now();
391
392        let jobs = cancel_jobs(&tx, filters).await?;
393        mark_jobs_inactive(
394            &tx,
395            &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>(),
396        )
397        .await?;
398        tx.commit().await?;
399
400        Ok(jobs)
401    }
402
403    async fn add_schedules(
404        &self,
405        schedules: &[ScheduleDefinition],
406        if_not_exists: Option<ScheduleFilters>,
407    ) -> Result<AddedSchedules> {
408        if schedules.is_empty() {
409            return Ok(AddedSchedules::Added(Vec::new()));
410        }
411
412        let mut conn = self.pool.get().await?;
413        let tx = conn.transaction().await?;
414
415        if let Some(filters) = if_not_exists {
416            let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
417
418            if !existing_schedule_ids.is_empty() {
419                tx.commit().await?;
420                return Ok(AddedSchedules::Existing(existing_schedule_ids));
421            }
422        }
423
424        let mut col_id = Vec::with_capacity(schedules.len());
425        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
426        let mut col_job_template_json = Vec::with_capacity(schedules.len());
427        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
428        let mut col_start_after = Vec::with_capacity(schedules.len());
429        let mut col_end_before = Vec::with_capacity(schedules.len());
430
431        for schedule in schedules {
432            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
433
434            col_id.push(Uuid::now_v7());
435            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
436            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
437            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
438            col_start_after.push(start_after);
439            col_end_before.push(end_before);
440        }
441
442        {
443            let stmt = tx
444                .prepare(
445                    r#"--sql
446                    INSERT INTO ora.schedule (
447                        id,
448                        job_template_job_type_id,
449                        job_template_json,
450                        scheduling_policy_json,
451                        start_after,
452                        end_before
453                    ) SELECT * FROM UNNEST(
454                        $1::UUID[],
455                        $2::TEXT[],
456                        $3::TEXT[],
457                        $4::TEXT[],
458                        $5::TIMESTAMPTZ[],
459                        $6::TIMESTAMPTZ[]
460                    )
461                    "#,
462                )
463                .await?;
464
465            tx.execute(
466                &stmt,
467                &[
468                    &col_id,
469                    &col_job_template_job_type_id,
470                    &col_job_template_json,
471                    &col_scheduling_policy_json,
472                    &col_start_after,
473                    &col_end_before,
474                ],
475            )
476            .await?;
477        }
478
479        {
480            let mut col_schedule_id = Vec::with_capacity(schedules.len());
481            let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
482            let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
483
484            for (i, schedule) in schedules.iter().enumerate() {
485                for label in &schedule.labels {
486                    col_schedule_id.push(col_id[i]);
487                    col_schedule_label_key.push(label.key.as_str());
488                    col_schedule_label_value.push(label.value.as_str());
489                }
490            }
491
492            let stmt = tx
493                .prepare(
494                    r#"--sql
495                    INSERT INTO ora.schedule_label (
496                        schedule_id,
497                        label_key,
498                        label_value
499                    ) SELECT * FROM UNNEST(
500                        $1::UUID[],
501                        $2::TEXT[],
502                        $3::TEXT[]
503                    )
504                    "#,
505                )
506                .await?;
507
508            tx.execute(
509                &stmt,
510                &[
511                    &col_schedule_id,
512                    &col_schedule_label_key,
513                    &col_schedule_label_value,
514                ],
515            )
516            .await?;
517        }
518
519        tx.commit().await?;
520
521        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
522
523        Ok(AddedSchedules::Added(schedule_ids))
524    }
525
526    async fn list_schedules(
527        &self,
528        filters: ScheduleFilters,
529        order_by: Option<ScheduleOrderBy>,
530        page_size: u32,
531        page_token: Option<NextPageToken>,
532    ) -> Result<(
533        Vec<ScheduleDetails>,
534        Option<ora_backend::common::NextPageToken>,
535    )> {
536        let mut conn = self.pool.get().await?;
537        let tx = conn.read_only_transaction().await?;
538
539        let (schedules, next_page_token) = query::schedules::schedule_details(
540            &tx,
541            filters,
542            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
543            page_size,
544            page_token.map(|t| t.0),
545        )
546        .await?;
547
548        tx.commit().await?;
549
550        Ok((schedules, next_page_token))
551    }
552
553    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
554        let mut conn = self.pool.get().await?;
555        let tx = conn.read_only_transaction().await?;
556        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
557        tx.commit().await?;
558        Ok(count)
559    }
560
561    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
562        let mut conn = self.pool.get().await?;
563        let tx = conn.transaction().await?;
564        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
565        tx.commit().await?;
566        Ok(schedules)
567    }
568
569    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
570        async_stream::try_stream!({
571            let mut last_execution_id: Option<ExecutionId> = None;
572
573            loop {
574                let mut conn = self.pool.get().await?;
575                let tx = conn.read_only_transaction().await?;
576
577                let rows = if let Some(last_execution_id) = last_execution_id {
578                    let stmt = tx
579                        .prepare(
580                            r#"--sql
581                            SELECT
582                                ora.execution.id,
583                                ora.job.id,
584                                ora.job.job_type_id,
585                                ora.job.input_payload_json,
586                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
587                                ora.job.retry_policy_json,
588                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
589                            FROM
590                                ora.execution
591                            JOIN ora.job ON
592                                ora.execution.job_id = ora.job.id
593                            WHERE
594                                status = 0
595                                AND ora.job.target_execution_time <= NOW()
596                                AND ora.execution.id > $1::UUID
597                            ORDER BY
598                                ora.execution.id ASC
599                            LIMIT 1000
600                            "#,
601                        )
602                        .await?;
603
604                    tx.query(&stmt, &[&last_execution_id.0]).await?
605                } else {
606                    let stmt = tx
607                        .prepare(
608                            r#"--sql
609                            SELECT
610                                ora.execution.id,
611                                ora.job.id,
612                                ora.job.job_type_id,
613                                ora.job.input_payload_json,
614                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
615                                ora.job.retry_policy_json,
616                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
617                            FROM
618                                ora.execution
619                            JOIN ora.job ON
620                                ora.execution.job_id = ora.job.id
621                            WHERE
622                                status = 0
623                                AND ora.job.target_execution_time <= NOW()
624                            ORDER BY
625                                ora.execution.id ASC
626                            LIMIT 1000
627                            "#,
628                        )
629                        .await?;
630
631                    tx.query(&stmt, &[]).await?
632                };
633
634                tx.commit().await?;
635
636                if rows.is_empty() {
637                    break;
638                }
639
640                let mut ready_executions = Vec::with_capacity(rows.len());
641
642                for row in rows {
643                    ready_executions.push(ReadyExecution {
644                        execution_id: ExecutionId(row.try_get(0)?),
645                        job_id: JobId(row.try_get(1)?),
646                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
647                        input_payload_json: row.try_get(3)?,
648                        attempt_number: row.try_get::<_, i64>(4)? as u64,
649                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
650                        target_execution_time: systemtime_from_ts(row.try_get::<_, f64>(6)?),
651                    });
652                }
653
654                last_execution_id = ready_executions.last().map(|e| e.execution_id);
655
656                yield ready_executions;
657            }
658        })
659    }
660
661    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
662        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
663
664        loop {
665            let mut conn = self.pool.get().await?;
666
667            let next_timestamp = {
668                let tx = conn.read_only_transaction().await?;
669
670                let stmt = tx
671                    .prepare(
672                        r#"--sql
673                        SELECT
674                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
675                        FROM
676                            ora.job
677                        JOIN ora.execution ON
678                            ora.execution.job_id = ora.job.id
679                        WHERE
680                            ora.execution.status = 0
681                            AND NOT (ora.execution.id = ANY($1::UUID[]))
682                        ORDER BY
683                            ora.job.target_execution_time ASC
684                        LIMIT 1
685                        "#,
686                    )
687                    .await?;
688
689                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
690
691                tx.commit().await?;
692
693                match row {
694                    Some(row) => row.try_get::<_, Option<f64>>(0)?.map(systemtime_from_ts),
695                    None => None,
696                }
697            };
698
699            drop(conn);
700
701            if let Some(next_timestamp) = next_timestamp {
702                let now = std::time::SystemTime::now();
703
704                if next_timestamp <= now {
705                    break;
706                }
707
708                tokio::time::sleep(
709                    next_timestamp
710                        .duration_since(now)
711                        .unwrap_or_else(|_| Duration::from_secs(0)),
712                )
713                .await;
714                break;
715            }
716
717            tokio::time::sleep(Duration::from_millis(500)).await;
718        }
719
720        Ok(())
721    }
722
723    fn in_progress_executions(
724        &self,
725    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
726        async_stream::try_stream!({
727            let mut last_execution_id: Option<ExecutionId> = None;
728
729            loop {
730                let mut conn = self.pool.get().await?;
731                let tx = conn.read_only_transaction().await?;
732
733                let rows = if let Some(last_execution_id) = last_execution_id {
734                    let stmt = tx
735                        .prepare(
736                            r#"--sql
737                            SELECT
738                                ora.execution.id,
739                                ora.job.id,
740                                ora.execution.executor_id,
741                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
742                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
743                                ora.job.timeout_policy_json,
744                                ora.job.retry_policy_json,
745                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
746                            FROM
747                                ora.execution
748                            JOIN ora.job ON
749                                ora.execution.job_id = ora.job.id
750                            WHERE
751                                status = 1
752                                AND ora.execution.id > $1::UUID
753                            ORDER BY
754                                ora.execution.id ASC
755                            LIMIT 1000
756                            "#,
757                        )
758                        .await?;
759
760                    tx.query(&stmt, &[&last_execution_id.0]).await?
761                } else {
762                    let stmt = tx
763                        .prepare(
764                            r#"--sql
765                            SELECT
766                                ora.execution.id,
767                                ora.job.id,
768                                ora.execution.executor_id,
769                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
770                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
771                                ora.job.timeout_policy_json,
772                                ora.job.retry_policy_json,
773                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
774                            FROM
775                                ora.execution
776                            JOIN ora.job ON
777                                ora.execution.job_id = ora.job.id
778                            WHERE
779                                status = 1
780                            ORDER BY
781                                ora.execution.id ASC
782                            LIMIT 1000
783                        "#,
784                        )
785                        .await?;
786
787                    tx.query(&stmt, &[]).await?
788                };
789
790                tx.commit().await?;
791
792                if rows.is_empty() {
793                    break;
794                }
795
796                let mut in_progress_executions = Vec::with_capacity(rows.len());
797
798                for row in rows {
799                    in_progress_executions.push(InProgressExecution {
800                        execution_id: ExecutionId(row.try_get(0)?),
801                        job_id: JobId(row.try_get(1)?),
802                        executor_id: ExecutorId(row.try_get(2)?),
803                        target_execution_time: systemtime_from_ts(row.try_get::<_, f64>(3)?),
804                        started_at: systemtime_from_ts(row.try_get::<_, f64>(4)?),
805                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
806                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
807                        attempt_number: row.try_get::<_, i64>(7)? as u64,
808                    });
809                }
810
811                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
812
813                yield in_progress_executions;
814            }
815        })
816    }
817
818    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
819        if executions.is_empty() {
820            return Ok(());
821        }
822
823        let mut conn = self.pool.get().await?;
824
825        let tx = conn.transaction().await?;
826
827        let stmt = tx
828            .prepare(
829                r#"--sql
830                UPDATE ora.execution
831                SET
832                    executor_id = t.executor_id,
833                    started_at = to_timestamp(t.started_at)
834                FROM UNNEST(
835                    $1::UUID[],
836                    $2::UUID[],
837                    $3::DOUBLE PRECISION[]
838                ) AS t(execution_id, executor_id, started_at)
839                WHERE
840                    execution_id = id
841                    AND ora.execution.started_at IS NULL
842                "#,
843            )
844            .await?;
845
846        let mut col_execution_id = Vec::with_capacity(executions.len());
847        let mut col_executor_id = Vec::with_capacity(executions.len());
848        let mut col_started_at = Vec::with_capacity(executions.len());
849
850        for execution in executions {
851            col_execution_id.push(execution.execution_id.0);
852            col_executor_id.push(execution.executor_id.0);
853            col_started_at.push(systemtime_to_ts(execution.started_at));
854        }
855
856        tx.execute(
857            &stmt,
858            &[&col_execution_id, &col_executor_id, &col_started_at],
859        )
860        .await?;
861
862        tx.commit().await?;
863
864        Ok(())
865    }
866
867    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
868        if executions.is_empty() {
869            return Ok(());
870        }
871
872        let mut conn = self.pool.get().await?;
873
874        let tx = conn.transaction().await?;
875
876        let stmt = tx
877            .prepare(
878                r#"--sql
879                UPDATE ora.execution
880                SET
881                    succeeded_at = to_timestamp(t.succeeded_at),
882                    output_json = t.output_json
883                FROM UNNEST(
884                    $1::UUID[],
885                    $2::DOUBLE PRECISION[],
886                    $3::TEXT[]
887                ) AS t(execution_id, succeeded_at, output_json)
888                WHERE
889                    execution_id = id
890                    AND status < 2
891                RETURNING job_id, t.succeeded_at
892                "#,
893            )
894            .await?;
895
896        let mut col_execution_id = Vec::with_capacity(executions.len());
897        let mut col_succeeded_at = Vec::with_capacity(executions.len());
898        let mut col_output_json = Vec::with_capacity(executions.len());
899
900        for execution in executions {
901            col_execution_id.push(execution.execution_id.0);
902            col_succeeded_at.push(systemtime_to_ts(execution.succeeded_at));
903            col_output_json.push(execution.output_json.as_str());
904        }
905
906        let rows = tx
907            .query(
908                &stmt,
909                &[&col_execution_id, &col_succeeded_at, &col_output_json],
910            )
911            .await?;
912
913        let mut jobs = Vec::with_capacity(rows.len());
914        for row in rows {
915            jobs.push((
916                JobId(row.try_get(0)?),
917                systemtime_from_ts(row.try_get::<_, f64>(1)?),
918            ));
919        }
920
921        mark_jobs_inactive(&tx, &jobs).await?;
922
923        tx.commit().await?;
924
925        Ok(())
926    }
927
928    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
929        if executions.is_empty() {
930            return Ok(());
931        }
932
933        let mut conn = self.pool.get().await?;
934        let tx = conn.transaction().await?;
935
936        let jobs = executions_failed(&tx, executions).await?;
937
938        mark_jobs_inactive(&tx, &jobs).await?;
939
940        tx.commit().await?;
941
942        Ok(())
943    }
944
945    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
946        if executions.is_empty() {
947            return Ok(());
948        }
949
950        let mut conn = self.pool.get().await?;
951        let tx = conn.transaction().await?;
952
953        let jobs = executions_failed(&tx, executions).await?;
954        add_executions(
955            &tx,
956            &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
957        )
958        .await?;
959
960        tx.commit().await?;
961
962        Ok(())
963    }
964
965    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
966        async_stream::try_stream!({
967            let mut last_schedule_id: Option<ScheduleId> = None;
968            loop {
969                let mut conn = self.pool.get().await?;
970                let tx = conn.read_only_transaction().await?;
971
972                let rows = if let Some(last_schedule_id) = last_schedule_id {
973                    let stmt = tx
974                        .prepare(
975                            r#"--sql
976                            SELECT
977                                id,
978                                EXTRACT(EPOCH FROM(
979                                    SELECT
980                                        target_execution_time
981                                    FROM
982                                        ora.job
983                                    WHERE
984                                        ora.job.schedule_id = ora.schedule.id
985                                    ORDER BY ora.job.id DESC
986                                    LIMIT 1
987                                ))::DOUBLE PRECISION,
988                                scheduling_policy_json,
989                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
990                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
991                                job_template_json
992                            FROM
993                                ora.schedule
994                            WHERE
995                                (
996                                    ora.schedule.stopped_at IS NULL
997                                    AND ora.schedule.active_job_id IS NULL
998                                )
999                                AND id > $1::UUID
1000                            ORDER BY id ASC
1001                            LIMIT 1000
1002                            "#,
1003                        )
1004                        .await?;
1005
1006                    tx.query(&stmt, &[&last_schedule_id.0]).await?
1007                } else {
1008                    let stmt = tx
1009                        .prepare(
1010                            r#"--sql
1011                            SELECT
1012                                id,
1013                                EXTRACT(EPOCH FROM(
1014                                    SELECT
1015                                        target_execution_time
1016                                    FROM
1017                                        ora.job
1018                                    WHERE
1019                                        ora.job.schedule_id = ora.schedule.id
1020                                    ORDER BY ora.job.id DESC
1021                                    LIMIT 1
1022                                ))::DOUBLE PRECISION,
1023                                scheduling_policy_json,
1024                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1025                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1026                                job_template_json
1027                            FROM
1028                                ora.schedule
1029                            WHERE
1030                                ora.schedule.stopped_at IS NULL
1031                                AND ora.schedule.active_job_id IS NULL
1032                            ORDER BY id ASC
1033                            LIMIT 1000
1034                            "#,
1035                        )
1036                        .await?;
1037
1038                    tx.query(&stmt, &[]).await?
1039                };
1040
1041                tx.commit().await?;
1042
1043                if rows.is_empty() {
1044                    break;
1045                }
1046
1047                let mut schedules = Vec::with_capacity(rows.len());
1048
1049                for row in rows {
1050                    schedules.push(PendingSchedule {
1051                        schedule_id: ScheduleId(row.try_get(0)?),
1052                        last_target_execution_time: row
1053                            .try_get::<_, Option<f64>>(1)?
1054                            .map(systemtime_from_ts),
1055                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1056                        time_range: TimeRange {
1057                            start: row.try_get::<_, Option<f64>>(3)?.map(systemtime_from_ts),
1058                            end: row.try_get::<_, Option<f64>>(4)?.map(systemtime_from_ts),
1059                        },
1060                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1061                    });
1062                }
1063
1064                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1065                yield schedules;
1066            }
1067        })
1068    }
1069
1070    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1071        let batch_size = i64::try_from(self.delete_batch_size).unwrap_or(i64::MAX);
1072
1073        let mut conn = self.pool.get().await?;
1074
1075        // delete in a loop so that we don't lock
1076        // tables for too long
1077        loop {
1078            let mut deleted_rows = 0;
1079
1080            {
1081                let tx = conn.transaction().await?;
1082
1083                let stmt = tx
1084                    .prepare(
1085                        r#"--sql
1086                        WITH cte AS (
1087                            SELECT ctid
1088                            FROM ora.job
1089                            WHERE inactive_since < to_timestamp($1)
1090                            ORDER BY inactive_since
1091                            LIMIT $2
1092                        )
1093                        DELETE FROM ora.job j
1094                        USING cte
1095                        WHERE j.ctid = cte.ctid;
1096                        "#,
1097                    )
1098                    .await?;
1099
1100                let deleted = tx
1101                    .execute(&stmt, &[&systemtime_to_ts(before), &batch_size])
1102                    .await?;
1103
1104                deleted_rows += deleted;
1105
1106                tx.commit().await?;
1107            }
1108
1109            {
1110                let tx = conn.transaction().await?;
1111                let stmt = tx
1112                    .prepare(
1113                        r#"--sql
1114                        WITH cte AS (
1115                            SELECT ctid
1116                            FROM ora.schedule
1117                            WHERE stopped_at < to_timestamp($1)
1118                            ORDER BY stopped_at
1119                            LIMIT $2
1120                        )
1121                        DELETE FROM ora.schedule s
1122                        USING cte
1123                        WHERE s.ctid = cte.ctid;
1124                        "#,
1125                    )
1126                    .await?;
1127
1128                let deleted = tx
1129                    .execute(&stmt, &[&systemtime_to_ts(before), &batch_size])
1130                    .await?;
1131
1132                tx.commit().await?;
1133
1134                deleted_rows += deleted;
1135            }
1136
1137            {
1138                let tx = conn.transaction().await?;
1139
1140                let stmt = tx
1141                    .prepare(
1142                        r#"--sql
1143                        DELETE FROM ora.job_type
1144                        WHERE
1145                            NOT EXISTS (
1146                                SELECT FROM ora.job
1147                                WHERE
1148                                    ora.job.job_type_id = ora.job_type.id
1149                            )
1150                            AND NOT EXISTS (
1151                                SELECT FROM ora.schedule
1152                                WHERE
1153                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1154                            )
1155                        "#,
1156                    )
1157                    .await?;
1158
1159                tx.execute(&stmt, &[]).await?;
1160
1161                tx.commit().await?;
1162            }
1163
1164            if deleted_rows == 0 {
1165                break;
1166            }
1167        }
1168
1169        Ok(())
1170    }
1171
1172    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1173        loop {
1174            let mut conn = self.pool.get().await?;
1175
1176            let has_pending = {
1177                let tx = conn.read_only_transaction().await?;
1178
1179                let stmt = tx
1180                    .prepare(
1181                        r#"--sql
1182                        SELECT 1 FROM ora.schedule
1183                        WHERE
1184                            ora.schedule.stopped_at IS NULL
1185                            AND ora.schedule.active_job_id IS NULL
1186                        LIMIT 1
1187                        "#,
1188                    )
1189                    .await?;
1190
1191                let row = tx.query_opt(&stmt, &[]).await?;
1192
1193                tx.commit().await?;
1194
1195                row.is_some()
1196            };
1197
1198            drop(conn);
1199
1200            if has_pending {
1201                break;
1202            }
1203
1204            tokio::time::sleep(Duration::from_millis(500)).await;
1205        }
1206
1207        Ok(())
1208    }
1209}
1210
1211async fn executions_failed(
1212    tx: &DbTransaction<'_>,
1213    executions: &[FailedExecution],
1214) -> Result<Vec<(JobId, SystemTime)>> {
1215    let stmt = tx
1216        .prepare(
1217            r#"--sql
1218            UPDATE ora.execution
1219            SET
1220                failed_at = to_timestamp(t.failed_at),
1221                failure_reason = t.failure_reason
1222            FROM UNNEST(
1223                $1::UUID[],
1224                $2::DOUBLE PRECISION[],
1225                $3::TEXT[]
1226            ) AS t(execution_id, failed_at, failure_reason)
1227            WHERE
1228                execution_id = id
1229                AND status < 2
1230            RETURNING job_id, t.failed_at
1231            "#,
1232        )
1233        .await?;
1234
1235    let mut col_execution_id = Vec::with_capacity(executions.len());
1236    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1237    let mut col_failure_reason = Vec::with_capacity(executions.len());
1238
1239    for execution in executions {
1240        col_execution_id.push(execution.execution_id.0);
1241        col_succeeded_at.push(systemtime_to_ts(execution.failed_at));
1242        col_failure_reason.push(execution.failure_reason.as_str());
1243    }
1244
1245    let rows = tx
1246        .query(
1247            &stmt,
1248            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1249        )
1250        .await?;
1251
1252    let mut jobs = Vec::with_capacity(rows.len());
1253
1254    for row in rows {
1255        jobs.push((
1256            JobId(row.try_get(0)?),
1257            systemtime_from_ts(row.try_get::<_, f64>(1)?),
1258        ));
1259    }
1260
1261    Ok(jobs)
1262}
1263
1264async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1265    if jobs.is_empty() {
1266        return Ok(());
1267    }
1268
1269    let mut col_job_id = Vec::with_capacity(jobs.len());
1270    let mut col_execution_id = Vec::with_capacity(jobs.len());
1271
1272    for job in jobs {
1273        col_job_id.push(job.0);
1274        col_execution_id.push(Uuid::now_v7());
1275    }
1276
1277    let stmt = tx
1278        .prepare(
1279            r#"--sql
1280            INSERT INTO ora.execution (
1281                id,
1282                job_id
1283            ) SELECT * FROM UNNEST(
1284                $1::UUID[],
1285                $2::UUID[]
1286            )
1287            "#,
1288        )
1289        .await?;
1290
1291    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1292
1293    Ok(())
1294}
1295
1296async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1297    if jobs.is_empty() {
1298        return Ok(());
1299    }
1300
1301    let mut col_job_id = Vec::with_capacity(jobs.len());
1302    let mut col_inactive_since = Vec::with_capacity(jobs.len());
1303
1304    for (job, inactive_since) in jobs {
1305        col_job_id.push(job.0);
1306        col_inactive_since.push(systemtime_to_ts(*inactive_since));
1307    }
1308
1309    {
1310        let stmt = tx
1311            .prepare(
1312                r#"--sql
1313                UPDATE ora.job
1314                SET inactive_since = to_timestamp(t.inactive_since)
1315                FROM
1316                    UNNEST(
1317                        $1::UUID[],
1318                        $2::DOUBLE PRECISION[]
1319                    ) AS t(job_id, inactive_since)
1320                WHERE
1321                    id = t.job_id;
1322                "#,
1323            )
1324            .await?;
1325
1326        tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1327            .await?;
1328    }
1329
1330    {
1331        let stmt = tx
1332            .prepare(
1333                r#"--sql
1334                UPDATE ora.schedule
1335                SET
1336                    active_job_id = NULL
1337                WHERE active_job_id = ANY($1::UUID[])
1338                "#,
1339            )
1340            .await?;
1341
1342        tx.execute(&stmt, &[&col_job_id]).await?;
1343    }
1344
1345    Ok(())
1346}