Skip to main content

ora_backend_postgres/
lib.rs

1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8    Backend,
9    common::{NextPageToken, TimeRange},
10    executions::{
11        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12        SucceededExecution,
13    },
14    executors::ExecutorId,
15    jobs::{
16        AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17        NewJob,
18    },
19    schedules::{
20        AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21        ScheduleId, ScheduleOrderBy, StoppedSchedule,
22    },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30    db::{DbPool, DbTransaction},
31    query::{
32        jobs::{cancel_jobs, job_count},
33        schedules::schedule_count,
34    },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44    pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49/// Errors that can occur when using the Postgres backend.
50#[derive(Error, Debug)]
51pub enum Error {
52    #[error("{0}")]
53    Migrations(#[from] refinery::Error),
54    #[error("{0}")]
55    Pool(#[from] PoolError),
56    #[error("{0}")]
57    Postgres(#[from] tokio_postgres::Error),
58    #[error("{0}")]
59    Serde(#[from] serde_json::Error),
60    #[error("invalid page token: {0}")]
61    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65    /// Create and initialize a new Postgres backend
66    /// with the given connection pool.
67    pub async fn new(pool: Pool) -> Result<Self> {
68        let mut conn = pool.get().await?;
69        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70        migrations::runner()
71            .set_migration_table_name("ora.migrations")
72            .run_async(&mut **conn)
73            .await?;
74        Ok(Self { pool: DbPool(pool) })
75    }
76}
77
78impl Backend for PostgresBackend {
79    type Error = Error;
80
81    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82        if job_types.is_empty() {
83            return Ok(());
84        }
85
86        let mut conn = self.pool.get().await?;
87
88        let tx = conn.transaction().await?;
89
90        let mut col_id = Vec::with_capacity(job_types.len());
91        let mut col_description = Vec::with_capacity(job_types.len());
92        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95        for job_type in job_types {
96            col_id.push(job_type.id.as_str());
97            col_description.push(job_type.description.as_deref());
98            col_input_schema_json.push(job_type.input_schema_json.as_deref());
99            col_output_schema_json.push(job_type.output_schema_json.as_deref());
100        }
101
102        {
103            let stmt = tx
104                .prepare(
105                    r#"--sql
106                    INSERT INTO ora.job_type (
107                        id,
108                        description,
109                        input_schema_json,
110                        output_schema_json
111                    ) SELECT * FROM UNNEST(
112                        $1::TEXT[],
113                        $2::TEXT[],
114                        $3::TEXT[],
115                        $4::TEXT[]
116                    ) ON CONFLICT (id) DO UPDATE SET
117                        description = EXCLUDED.description,
118                        input_schema_json = EXCLUDED.input_schema_json,
119                        output_schema_json = EXCLUDED.output_schema_json
120                    WHERE
121                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124                    "#,
125                )
126                .await?;
127
128            tx.execute(
129                &stmt,
130                &[
131                    &col_id,
132                    &col_description,
133                    &col_input_schema_json,
134                    &col_output_schema_json,
135                ],
136            )
137            .await?;
138        }
139
140        tx.commit().await?;
141
142        Ok(())
143    }
144
145    async fn list_job_types(&self) -> Result<Vec<JobType>> {
146        let mut conn = self.pool.get().await?;
147
148        let tx = conn.read_only_transaction().await?;
149
150        let job_types = {
151            let stmt = tx
152                .prepare(
153                    r#"--sql
154                    SELECT
155                        id,
156                        description,
157                        input_schema_json,
158                        output_schema_json
159                    FROM
160                        ora.job_type
161                    ORDER BY id
162                    "#,
163                )
164                .await?;
165
166            let rows = tx.query(&stmt, &[]).await?;
167
168            rows.into_iter()
169                .map(|row| {
170                    Result::<_>::Ok(JobType {
171                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172                        description: row.try_get(1)?,
173                        input_schema_json: row.try_get(2)?,
174                        output_schema_json: row.try_get(3)?,
175                    })
176                })
177                .collect::<Result<Vec<_>>>()?
178        };
179
180        tx.commit().await?;
181
182        Ok(job_types)
183    }
184
185    async fn add_jobs(
186        &self,
187        jobs: &[NewJob],
188        if_not_exists: Option<JobFilters>,
189    ) -> Result<AddedJobs> {
190        if jobs.is_empty() {
191            return Ok(AddedJobs::Added(Vec::new()));
192        }
193
194        let mut conn = self.pool.get().await?;
195
196        let tx = conn.transaction().await?;
197
198        if let Some(filters) = if_not_exists {
199            let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201            if !existing_job_ids.is_empty() {
202                tx.commit().await?;
203                return Ok(AddedJobs::Existing(existing_job_ids));
204            }
205        }
206
207        let mut col_id = Vec::with_capacity(jobs.len());
208        let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210        {
211            let mut col_job_type_id = Vec::with_capacity(jobs.len());
212            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217            for job in jobs {
218                col_id.push(Uuid::now_v7());
219                col_job_type_id.push(job.job.job_type_id.as_str());
220                col_target_execution_time.push(job.job.target_execution_time);
221                col_input_payload_json.push(job.job.input_payload_json.as_str());
222                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224                col_schedule_id.push(job.schedule_id.map(|s| s.0));
225            }
226
227            let stmt = tx
228                .prepare(
229                    r#"--sql
230                    INSERT INTO ora.job (
231                        id,
232                        job_type_id,
233                        target_execution_time,
234                        input_payload_json,
235                        timeout_policy_json,
236                        retry_policy_json,
237                        schedule_id
238                    ) SELECT * FROM UNNEST(
239                        $1::UUID[],
240                        $2::TEXT[],
241                        $3::TIMESTAMPTZ[],
242                        $4::TEXT[],
243                        $5::TEXT[],
244                        $6::TEXT[],
245                        $7::UUID[]
246                    )
247                    "#,
248                )
249                .await?;
250
251            tx.execute(
252                &stmt,
253                &[
254                    &col_id,
255                    &col_job_type_id,
256                    &col_target_execution_time,
257                    &col_input_payload_json,
258                    &col_timeout_policy_json,
259                    &col_retry_policy_json,
260                    &col_schedule_id,
261                ],
262            )
263            .await?;
264        }
265
266        {
267            let mut col_job_id = Vec::with_capacity(jobs.len());
268            let mut col_job_label_key = Vec::with_capacity(jobs.len());
269            let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271            for (i, job) in jobs.iter().enumerate() {
272                for label in &job.job.labels {
273                    col_job_id.push(col_id[i]);
274                    col_job_label_key.push(label.key.as_str());
275                    col_job_label_value.push(label.value.as_str());
276                }
277            }
278
279            let stmt = tx
280                .prepare(
281                    r#"--sql
282                    INSERT INTO ora.job_label (
283                        job_id,
284                        label_key,
285                        label_value
286                    ) SELECT * FROM UNNEST(
287                        $1::UUID[],
288                        $2::TEXT[],
289                        $3::TEXT[]
290                    )
291                    "#,
292                )
293                .await?;
294
295            tx.execute(
296                &stmt,
297                &[&col_job_id, &col_job_label_key, &col_job_label_value],
298            )
299            .await?;
300        }
301
302        {
303            let stmt = tx
304                .prepare(
305                    r#"--sql
306                    UPDATE ora.schedule
307                    SET
308                        active_job_id = t.job_id
309                    FROM UNNEST(
310                        $1::UUID[],
311                        $2::UUID[]
312                    ) AS t(job_id, schedule_id)
313                    WHERE
314                        ora.schedule.id = t.schedule_id
315                    "#,
316                )
317                .await?;
318
319            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320        }
321
322        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323        add_executions(&tx, &job_ids).await?;
324
325        tx.commit().await?;
326
327        Ok(AddedJobs::Added(job_ids))
328    }
329
330    async fn list_jobs(
331        &self,
332        filters: JobFilters,
333        order_by: Option<JobOrderBy>,
334        page_size: u32,
335        page_token: Option<NextPageToken>,
336    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337        let mut conn = self.pool.get().await?;
338        let tx = conn.read_only_transaction().await?;
339
340        let (jobs, next_page_token) = query::jobs::job_details(
341            &tx,
342            filters,
343            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344            page_size,
345            page_token.map(|t| t.0),
346        )
347        .await?;
348
349        tx.commit().await?;
350
351        Ok((jobs, next_page_token))
352    }
353
354    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355        let mut conn = self.pool.get().await?;
356        let tx = conn.read_only_transaction().await?;
357        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358        tx.commit().await?;
359        Ok(count)
360    }
361
362    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363        let mut conn = self.pool.get().await?;
364        let tx = conn.transaction().await?;
365        let jobs = cancel_jobs(&tx, filters).await?;
366
367        let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
368
369        mark_jobs_inactive(&tx, &job_ids).await?;
370
371        tx.commit().await?;
372
373        Ok(jobs)
374    }
375
376    async fn add_schedules(
377        &self,
378        schedules: &[ScheduleDefinition],
379        if_not_exists: Option<ScheduleFilters>,
380    ) -> Result<AddedSchedules> {
381        if schedules.is_empty() {
382            return Ok(AddedSchedules::Added(Vec::new()));
383        }
384
385        let mut conn = self.pool.get().await?;
386        let tx = conn.transaction().await?;
387
388        if let Some(filters) = if_not_exists {
389            let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
390
391            if !existing_schedule_ids.is_empty() {
392                tx.commit().await?;
393                return Ok(AddedSchedules::Existing(existing_schedule_ids));
394            }
395        }
396
397        let mut col_id = Vec::with_capacity(schedules.len());
398        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
399        let mut col_job_template_json = Vec::with_capacity(schedules.len());
400        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
401        let mut col_start_after = Vec::with_capacity(schedules.len());
402        let mut col_end_before = Vec::with_capacity(schedules.len());
403
404        for schedule in schedules {
405            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
406
407            col_id.push(Uuid::now_v7());
408            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
409            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
410            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
411            col_start_after.push(start_after);
412            col_end_before.push(end_before);
413        }
414
415        {
416            let stmt = tx
417                .prepare(
418                    r#"--sql
419                    INSERT INTO ora.schedule (
420                        id,
421                        job_template_job_type_id,
422                        job_template_json,
423                        scheduling_policy_json,
424                        start_after,
425                        end_before
426                    ) SELECT * FROM UNNEST(
427                        $1::UUID[],
428                        $2::TEXT[],
429                        $3::TEXT[],
430                        $4::TEXT[],
431                        $5::TIMESTAMPTZ[],
432                        $6::TIMESTAMPTZ[]
433                    )
434                    "#,
435                )
436                .await?;
437
438            tx.execute(
439                &stmt,
440                &[
441                    &col_id,
442                    &col_job_template_job_type_id,
443                    &col_job_template_json,
444                    &col_scheduling_policy_json,
445                    &col_start_after,
446                    &col_end_before,
447                ],
448            )
449            .await?;
450        }
451
452        {
453            let mut col_schedule_id = Vec::with_capacity(schedules.len());
454            let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
455            let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
456
457            for (i, schedule) in schedules.iter().enumerate() {
458                for label in &schedule.labels {
459                    col_schedule_id.push(col_id[i]);
460                    col_schedule_label_key.push(label.key.as_str());
461                    col_schedule_label_value.push(label.value.as_str());
462                }
463            }
464
465            let stmt = tx
466                .prepare(
467                    r#"--sql
468                    INSERT INTO ora.schedule_label (
469                        schedule_id,
470                        label_key,
471                        label_value
472                    ) SELECT * FROM UNNEST(
473                        $1::UUID[],
474                        $2::TEXT[],
475                        $3::TEXT[]
476                    )
477                    "#,
478                )
479                .await?;
480
481            tx.execute(
482                &stmt,
483                &[
484                    &col_schedule_id,
485                    &col_schedule_label_key,
486                    &col_schedule_label_value,
487                ],
488            )
489            .await?;
490        }
491
492        tx.commit().await?;
493
494        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
495
496        Ok(AddedSchedules::Added(schedule_ids))
497    }
498
499    async fn list_schedules(
500        &self,
501        filters: ScheduleFilters,
502        order_by: Option<ScheduleOrderBy>,
503        page_size: u32,
504        page_token: Option<NextPageToken>,
505    ) -> Result<(
506        Vec<ScheduleDetails>,
507        Option<ora_backend::common::NextPageToken>,
508    )> {
509        let mut conn = self.pool.get().await?;
510        let tx = conn.read_only_transaction().await?;
511
512        let (schedules, next_page_token) = query::schedules::schedule_details(
513            &tx,
514            filters,
515            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
516            page_size,
517            page_token.map(|t| t.0),
518        )
519        .await?;
520
521        tx.commit().await?;
522
523        Ok((schedules, next_page_token))
524    }
525
526    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
527        let mut conn = self.pool.get().await?;
528        let tx = conn.read_only_transaction().await?;
529        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
530        tx.commit().await?;
531        Ok(count)
532    }
533
534    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
535        let mut conn = self.pool.get().await?;
536        let tx = conn.transaction().await?;
537        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
538        tx.commit().await?;
539        Ok(schedules)
540    }
541
542    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
543        async_stream::try_stream!({
544            let mut last_execution_id: Option<ExecutionId> = None;
545
546            loop {
547                let mut conn = self.pool.get().await?;
548                let tx = conn.read_only_transaction().await?;
549
550                let rows = if let Some(last_execution_id) = last_execution_id {
551                    let stmt = tx
552                        .prepare(
553                            r#"--sql
554                            SELECT
555                                ora.execution.id,
556                                ora.job.id,
557                                ora.job.job_type_id,
558                                ora.job.input_payload_json,
559                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
560                                ora.job.retry_policy_json,
561                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
562                            FROM
563                                ora.execution
564                            JOIN ora.job ON
565                                ora.execution.job_id = ora.job.id
566                            WHERE
567                                status = 0
568                                AND ora.job.target_execution_time <= NOW()
569                                AND ora.execution.id > $1::UUID
570                            ORDER BY
571                                ora.execution.id ASC
572                            LIMIT 1000
573                            "#,
574                        )
575                        .await?;
576
577                    tx.query(&stmt, &[&last_execution_id.0]).await?
578                } else {
579                    let stmt = tx
580                        .prepare(
581                            r#"--sql
582                            SELECT
583                                ora.execution.id,
584                                ora.job.id,
585                                ora.job.job_type_id,
586                                ora.job.input_payload_json,
587                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
588                                ora.job.retry_policy_json,
589                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
590                            FROM
591                                ora.execution
592                            JOIN ora.job ON
593                                ora.execution.job_id = ora.job.id
594                            WHERE
595                                status = 0
596                                AND ora.job.target_execution_time <= NOW()
597                            ORDER BY
598                                ora.execution.id ASC
599                            LIMIT 1000
600                            "#,
601                        )
602                        .await?;
603
604                    tx.query(&stmt, &[]).await?
605                };
606
607                tx.commit().await?;
608
609                if rows.is_empty() {
610                    break;
611                }
612
613                let mut ready_executions = Vec::with_capacity(rows.len());
614
615                for row in rows {
616                    ready_executions.push(ReadyExecution {
617                        execution_id: ExecutionId(row.try_get(0)?),
618                        job_id: JobId(row.try_get(1)?),
619                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
620                        input_payload_json: row.try_get(3)?,
621                        attempt_number: row.try_get::<_, i64>(4)? as u64,
622                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
623                        target_execution_time: UNIX_EPOCH
624                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
625                    });
626                }
627
628                last_execution_id = ready_executions.last().map(|e| e.execution_id);
629
630                yield ready_executions;
631            }
632        })
633    }
634
635    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
636        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
637
638        loop {
639            let mut conn = self.pool.get().await?;
640
641            let next_timestamp = {
642                let tx = conn.read_only_transaction().await?;
643
644                let stmt = tx
645                    .prepare(
646                        r#"--sql
647                        SELECT
648                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
649                        FROM
650                            ora.job
651                        JOIN ora.execution ON
652                            ora.execution.job_id = ora.job.id
653                        WHERE
654                            ora.execution.status = 0
655                            AND NOT (ora.execution.id = ANY($1::UUID[]))
656                        ORDER BY
657                            ora.job.target_execution_time ASC
658                        LIMIT 1
659                        "#,
660                    )
661                    .await?;
662
663                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
664
665                tx.commit().await?;
666
667                match row {
668                    Some(row) => row
669                        .try_get::<_, Option<f64>>(0)?
670                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
671                    None => None,
672                }
673            };
674
675            drop(conn);
676
677            if let Some(next_timestamp) = next_timestamp {
678                let now = std::time::SystemTime::now();
679
680                if next_timestamp <= now {
681                    break;
682                }
683
684                tokio::time::sleep(
685                    next_timestamp
686                        .duration_since(now)
687                        .unwrap_or_else(|_| Duration::from_secs(0)),
688                )
689                .await;
690                break;
691            }
692
693            tokio::time::sleep(Duration::from_millis(500)).await;
694        }
695
696        Ok(())
697    }
698
699    fn in_progress_executions(
700        &self,
701    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
702        async_stream::try_stream!({
703            let mut last_execution_id: Option<ExecutionId> = None;
704
705            loop {
706                let mut conn = self.pool.get().await?;
707                let tx = conn.read_only_transaction().await?;
708
709                let rows = if let Some(last_execution_id) = last_execution_id {
710                    let stmt = tx
711                        .prepare(
712                            r#"--sql
713                            SELECT
714                                ora.execution.id,
715                                ora.job.id,
716                                ora.execution.executor_id,
717                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
718                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
719                                ora.job.timeout_policy_json,
720                                ora.job.retry_policy_json,
721                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
722                            FROM
723                                ora.execution
724                            JOIN ora.job ON
725                                ora.execution.job_id = ora.job.id
726                            WHERE
727                                status = 1
728                                AND ora.execution.id > $1::UUID
729                            ORDER BY
730                                ora.execution.id ASC
731                            LIMIT 1000
732                            "#,
733                        )
734                        .await?;
735
736                    tx.query(&stmt, &[&last_execution_id.0]).await?
737                } else {
738                    let stmt = tx
739                        .prepare(
740                            r#"--sql
741                            SELECT
742                                ora.execution.id,
743                                ora.job.id,
744                                ora.execution.executor_id,
745                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
746                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
747                                ora.job.timeout_policy_json,
748                                ora.job.retry_policy_json,
749                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
750                            FROM
751                                ora.execution
752                            JOIN ora.job ON
753                                ora.execution.job_id = ora.job.id
754                            WHERE
755                                status = 1
756                            ORDER BY
757                                ora.execution.id ASC
758                            LIMIT 1000
759                        "#,
760                        )
761                        .await?;
762
763                    tx.query(&stmt, &[]).await?
764                };
765
766                tx.commit().await?;
767
768                if rows.is_empty() {
769                    break;
770                }
771
772                let mut in_progress_executions = Vec::with_capacity(rows.len());
773
774                for row in rows {
775                    in_progress_executions.push(InProgressExecution {
776                        execution_id: ExecutionId(row.try_get(0)?),
777                        job_id: JobId(row.try_get(1)?),
778                        executor_id: ExecutorId(row.try_get(2)?),
779                        target_execution_time: UNIX_EPOCH
780                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
781                        started_at: UNIX_EPOCH
782                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
783                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
784                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
785                        attempt_number: row.try_get::<_, i64>(7)? as u64,
786                    });
787                }
788
789                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
790
791                yield in_progress_executions;
792            }
793        })
794    }
795
796    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
797        if executions.is_empty() {
798            return Ok(());
799        }
800
801        let mut conn = self.pool.get().await?;
802
803        let tx = conn.transaction().await?;
804
805        let stmt = tx
806            .prepare(
807                r#"--sql
808                UPDATE ora.execution
809                SET
810                    executor_id = t.executor_id,
811                    started_at = to_timestamp(t.started_at)
812                FROM UNNEST(
813                    $1::UUID[],
814                    $2::UUID[],
815                    $3::DOUBLE PRECISION[]
816                ) AS t(execution_id, executor_id, started_at)
817                WHERE
818                    execution_id = id
819                    AND ora.execution.started_at IS NULL
820                "#,
821            )
822            .await?;
823
824        let mut col_execution_id = Vec::with_capacity(executions.len());
825        let mut col_executor_id = Vec::with_capacity(executions.len());
826        let mut col_started_at = Vec::with_capacity(executions.len());
827
828        for execution in executions {
829            col_execution_id.push(execution.execution_id.0);
830            col_executor_id.push(execution.executor_id.0);
831            col_started_at.push(
832                execution
833                    .started_at
834                    .duration_since(UNIX_EPOCH)
835                    .unwrap_or_default()
836                    .as_secs_f64(),
837            );
838        }
839
840        tx.execute(
841            &stmt,
842            &[&col_execution_id, &col_executor_id, &col_started_at],
843        )
844        .await?;
845
846        tx.commit().await?;
847
848        Ok(())
849    }
850
851    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
852        if executions.is_empty() {
853            return Ok(());
854        }
855
856        let mut conn = self.pool.get().await?;
857
858        let tx = conn.transaction().await?;
859
860        let stmt = tx
861            .prepare(
862                r#"--sql
863                UPDATE ora.execution
864                SET
865                    succeeded_at = to_timestamp(t.succeeded_at),
866                    output_json = t.output_json
867                FROM UNNEST(
868                    $1::UUID[],
869                    $2::DOUBLE PRECISION[],
870                    $3::TEXT[]
871                ) AS t(execution_id, succeeded_at, output_json)
872                WHERE
873                    execution_id = id
874                    AND status < 2
875                RETURNING job_id
876                "#,
877            )
878            .await?;
879
880        let mut col_execution_id = Vec::with_capacity(executions.len());
881        let mut col_succeeded_at = Vec::with_capacity(executions.len());
882        let mut col_output_json = Vec::with_capacity(executions.len());
883
884        for execution in executions {
885            col_execution_id.push(execution.execution_id.0);
886            col_succeeded_at.push(
887                execution
888                    .succeeded_at
889                    .duration_since(UNIX_EPOCH)
890                    .unwrap_or_default()
891                    .as_secs_f64(),
892            );
893            col_output_json.push(execution.output_json.as_str());
894        }
895
896        let rows = tx
897            .query(
898                &stmt,
899                &[&col_execution_id, &col_succeeded_at, &col_output_json],
900            )
901            .await?;
902
903        let mut job_ids = Vec::with_capacity(rows.len());
904        for row in rows {
905            job_ids.push(JobId(row.try_get(0)?));
906        }
907
908        mark_jobs_inactive(&tx, &job_ids).await?;
909
910        tx.commit().await?;
911
912        Ok(())
913    }
914
915    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
916        if executions.is_empty() {
917            return Ok(());
918        }
919
920        let mut conn = self.pool.get().await?;
921        let tx = conn.transaction().await?;
922
923        let jobs = executions_failed(&tx, executions).await?;
924
925        mark_jobs_inactive(&tx, &jobs).await?;
926
927        tx.commit().await?;
928
929        Ok(())
930    }
931
932    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
933        if executions.is_empty() {
934            return Ok(());
935        }
936
937        let mut conn = self.pool.get().await?;
938        let tx = conn.transaction().await?;
939
940        let job_ids = executions_failed(&tx, executions).await?;
941        add_executions(&tx, &job_ids).await?;
942
943        tx.commit().await?;
944
945        Ok(())
946    }
947
948    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
949        async_stream::try_stream!({
950            let mut last_schedule_id: Option<ScheduleId> = None;
951            loop {
952                let mut conn = self.pool.get().await?;
953                let tx = conn.read_only_transaction().await?;
954
955                let rows = if let Some(last_schedule_id) = last_schedule_id {
956                    let stmt = tx
957                        .prepare(
958                            r#"--sql
959                            SELECT
960                                id,
961                                EXTRACT(EPOCH FROM(
962                                    SELECT
963                                        target_execution_time
964                                    FROM
965                                        ora.job
966                                    WHERE
967                                        ora.job.schedule_id = ora.schedule.id
968                                    ORDER BY ora.job.id DESC
969                                    LIMIT 1
970                                ))::DOUBLE PRECISION,
971                                scheduling_policy_json,
972                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
973                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
974                                job_template_json
975                            FROM
976                                ora.schedule
977                            WHERE
978                                (
979                                    ora.schedule.stopped_at IS NULL
980                                    AND ora.schedule.active_job_id IS NULL
981                                )
982                                AND id > $1::UUID
983                            ORDER BY id ASC
984                            LIMIT 1000
985                            "#,
986                        )
987                        .await?;
988
989                    tx.query(&stmt, &[&last_schedule_id.0]).await?
990                } else {
991                    let stmt = tx
992                        .prepare(
993                            r#"--sql
994                            SELECT
995                                id,
996                                EXTRACT(EPOCH FROM(
997                                    SELECT
998                                        target_execution_time
999                                    FROM
1000                                        ora.job
1001                                    WHERE
1002                                        ora.job.schedule_id = ora.schedule.id
1003                                    ORDER BY ora.job.id DESC
1004                                    LIMIT 1
1005                                ))::DOUBLE PRECISION,
1006                                scheduling_policy_json,
1007                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1008                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1009                                job_template_json
1010                            FROM
1011                                ora.schedule
1012                            WHERE
1013                                ora.schedule.stopped_at IS NULL
1014                                AND ora.schedule.active_job_id IS NULL
1015                            ORDER BY id ASC
1016                            LIMIT 1000
1017                            "#,
1018                        )
1019                        .await?;
1020
1021                    tx.query(&stmt, &[]).await?
1022                };
1023
1024                tx.commit().await?;
1025
1026                if rows.is_empty() {
1027                    break;
1028                }
1029
1030                let mut schedules = Vec::with_capacity(rows.len());
1031
1032                for row in rows {
1033                    schedules.push(PendingSchedule {
1034                        schedule_id: ScheduleId(row.try_get(0)?),
1035                        last_target_execution_time: row
1036                            .try_get::<_, Option<f64>>(1)?
1037                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1038                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1039                        time_range: TimeRange {
1040                            start: row
1041                                .try_get::<_, Option<f64>>(3)?
1042                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1043                            end: row
1044                                .try_get::<_, Option<f64>>(4)?
1045                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1046                        },
1047                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1048                    });
1049                }
1050
1051                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1052                yield schedules;
1053            }
1054        })
1055    }
1056
1057    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1058        let mut conn = self.pool.get().await?;
1059
1060        // delete in a loop so that we don't lock
1061        // tables for too long
1062        loop {
1063            let mut deleted_rows = 0;
1064
1065            {
1066                let tx = conn.transaction().await?;
1067
1068                let stmt = tx
1069                    .prepare(
1070                        r#"--sql
1071                        DELETE FROM ora.job
1072                        WHERE id IN (
1073                            SELECT ora.job.id
1074                            FROM ora.job
1075                            JOIN LATERAL (
1076                                SELECT
1077                                    status,
1078                                    succeeded_at,
1079                                    failed_at,
1080                                    cancelled_at
1081                                FROM
1082                                    ora.execution
1083                                WHERE
1084                                    ora.execution.job_id = ora.job.id
1085                                ORDER BY ora.execution.id DESC
1086                                LIMIT 1
1087                            ) e ON TRUE
1088                            WHERE
1089                                ora.job.inactive
1090                                AND (
1091                                    e.succeeded_at < to_timestamp($1)
1092                                    OR e.failed_at < to_timestamp($1)
1093                                    OR e.cancelled_at < to_timestamp($1)
1094                                )
1095                            LIMIT 25
1096                            FOR UPDATE SKIP LOCKED
1097                        );
1098                        "#,
1099                    )
1100                    .await?;
1101
1102                let deleted = tx
1103                    .execute(
1104                        &stmt,
1105                        &[&before
1106                            .duration_since(UNIX_EPOCH)
1107                            .unwrap_or_default()
1108                            .as_secs_f64()],
1109                    )
1110                    .await?;
1111
1112                deleted_rows += deleted;
1113
1114                tx.commit().await?;
1115            }
1116
1117            {
1118                let tx = conn.transaction().await?;
1119                let stmt = tx
1120                    .prepare(
1121                        r#"--sql
1122                        DELETE FROM ONLY ora.schedule
1123                        WHERE ctid IN (
1124                            SELECT ctid
1125                            FROM ora.schedule
1126                            WHERE 
1127                                ora.schedule.stopped_at < to_timestamp($1)
1128                            LIMIT 25
1129                            FOR UPDATE SKIP LOCKED
1130                        )
1131                        "#,
1132                    )
1133                    .await?;
1134
1135                let deleted = tx
1136                    .execute(
1137                        &stmt,
1138                        &[&before
1139                            .duration_since(UNIX_EPOCH)
1140                            .unwrap_or_default()
1141                            .as_secs_f64()],
1142                    )
1143                    .await?;
1144
1145                tx.commit().await?;
1146
1147                deleted_rows += deleted;
1148            }
1149
1150            {
1151                let tx = conn.transaction().await?;
1152
1153                let stmt = tx
1154                    .prepare(
1155                        r#"--sql
1156                        DELETE FROM ora.job_type
1157                        WHERE
1158                            NOT EXISTS (
1159                                SELECT FROM ora.job
1160                                WHERE
1161                                    ora.job.job_type_id = ora.job_type.id
1162                            )
1163                            AND NOT EXISTS (
1164                                SELECT FROM ora.schedule
1165                                WHERE
1166                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1167                            )
1168                        "#,
1169                    )
1170                    .await?;
1171
1172                tx.execute(&stmt, &[]).await?;
1173
1174                tx.commit().await?;
1175            }
1176
1177            if deleted_rows == 0 {
1178                break;
1179            }
1180        }
1181
1182        Ok(())
1183    }
1184
1185    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1186        loop {
1187            let mut conn = self.pool.get().await?;
1188
1189            let has_pending = {
1190                let tx = conn.read_only_transaction().await?;
1191
1192                let stmt = tx
1193                    .prepare(
1194                        r#"--sql
1195                        SELECT 1 FROM ora.schedule
1196                        WHERE
1197                            ora.schedule.stopped_at IS NULL
1198                            AND ora.schedule.active_job_id IS NULL
1199                        LIMIT 1
1200                        "#,
1201                    )
1202                    .await?;
1203
1204                let row = tx.query_opt(&stmt, &[]).await?;
1205
1206                tx.commit().await?;
1207
1208                row.is_some()
1209            };
1210
1211            drop(conn);
1212
1213            if has_pending {
1214                break;
1215            }
1216
1217            tokio::time::sleep(Duration::from_millis(500)).await;
1218        }
1219
1220        Ok(())
1221    }
1222}
1223
1224async fn executions_failed(
1225    tx: &DbTransaction<'_>,
1226    executions: &[FailedExecution],
1227) -> Result<Vec<JobId>> {
1228    let stmt = tx
1229        .prepare(
1230            r#"--sql
1231            UPDATE ora.execution
1232            SET
1233                failed_at = to_timestamp(t.failed_at),
1234                failure_reason = t.failure_reason
1235            FROM UNNEST(
1236                $1::UUID[],
1237                $2::DOUBLE PRECISION[],
1238                $3::TEXT[]
1239            ) AS t(execution_id, failed_at, failure_reason)
1240            WHERE
1241                execution_id = id
1242                AND status < 2
1243            RETURNING job_id;
1244            "#,
1245        )
1246        .await?;
1247
1248    let mut col_execution_id = Vec::with_capacity(executions.len());
1249    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1250    let mut col_failure_reason = Vec::with_capacity(executions.len());
1251
1252    for execution in executions {
1253        col_execution_id.push(execution.execution_id.0);
1254        col_succeeded_at.push(
1255            execution
1256                .failed_at
1257                .duration_since(UNIX_EPOCH)
1258                .unwrap_or_default()
1259                .as_secs_f64(),
1260        );
1261        col_failure_reason.push(execution.failure_reason.as_str());
1262    }
1263
1264    let rows = tx
1265        .query(
1266            &stmt,
1267            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1268        )
1269        .await?;
1270
1271    let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1272
1273    Ok(job_ids)
1274}
1275
1276async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1277    if jobs.is_empty() {
1278        return Ok(());
1279    }
1280
1281    let mut col_job_id = Vec::with_capacity(jobs.len());
1282    let mut col_execution_id = Vec::with_capacity(jobs.len());
1283
1284    for job in jobs {
1285        col_job_id.push(job.0);
1286        col_execution_id.push(Uuid::now_v7());
1287    }
1288
1289    let stmt = tx
1290        .prepare(
1291            r#"--sql
1292            INSERT INTO ora.execution (
1293                id,
1294                job_id
1295            ) SELECT * FROM UNNEST(
1296                $1::UUID[],
1297                $2::UUID[]
1298            )
1299            "#,
1300        )
1301        .await?;
1302
1303    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1304
1305    Ok(())
1306}
1307
1308async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1309    if jobs.is_empty() {
1310        return Ok(());
1311    }
1312
1313    let mut col_job_id = Vec::with_capacity(jobs.len());
1314
1315    for job in jobs {
1316        col_job_id.push(job.0);
1317    }
1318
1319    {
1320        let stmt = tx
1321            .prepare(
1322                r#"--sql
1323                UPDATE ora.job
1324                SET inactive = TRUE
1325                WHERE id = ANY($1::UUID[])
1326                "#,
1327            )
1328            .await?;
1329
1330        tx.execute(&stmt, &[&col_job_id]).await?;
1331    }
1332
1333    {
1334        let stmt = tx
1335            .prepare(
1336                r#"--sql
1337                UPDATE ora.schedule
1338                SET
1339                    active_job_id = NULL
1340                WHERE active_job_id = ANY($1::UUID[])
1341                "#,
1342            )
1343            .await?;
1344
1345        tx.execute(&stmt, &[&col_job_id]).await?;
1346    }
1347
1348    Ok(())
1349}