Skip to main content

ora_backend_postgres/
lib.rs

1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8    Backend,
9    common::{NextPageToken, TimeRange},
10    executions::{
11        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12        SucceededExecution,
13    },
14    executors::ExecutorId,
15    jobs::{
16        AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17        NewJob,
18    },
19    schedules::{
20        AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21        ScheduleId, ScheduleOrderBy, StoppedSchedule,
22    },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30    db::{DbPool, DbTransaction},
31    query::{
32        jobs::{cancel_jobs, job_count},
33        schedules::schedule_count,
34    },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44    pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49/// Errors that can occur when using the Postgres backend.
50#[derive(Error, Debug)]
51pub enum Error {
52    #[error("{0}")]
53    Migrations(#[from] refinery::Error),
54    #[error("{0}")]
55    Pool(#[from] PoolError),
56    #[error("{0}")]
57    Postgres(#[from] tokio_postgres::Error),
58    #[error("{0}")]
59    Serde(#[from] serde_json::Error),
60    #[error("invalid page token: {0}")]
61    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65    /// Create and initialize a new Postgres backend
66    /// with the given connection pool.
67    pub async fn new(pool: Pool) -> Result<Self> {
68        let mut conn = pool.get().await?;
69        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70        migrations::runner()
71            .set_migration_table_name("ora.migrations")
72            .run_async(&mut **conn)
73            .await?;
74        Ok(Self { pool: DbPool(pool) })
75    }
76}
77
78impl Backend for PostgresBackend {
79    type Error = Error;
80
81    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82        if job_types.is_empty() {
83            return Ok(());
84        }
85
86        let mut conn = self.pool.get().await?;
87
88        let tx = conn.transaction().await?;
89
90        let mut col_id = Vec::with_capacity(job_types.len());
91        let mut col_description = Vec::with_capacity(job_types.len());
92        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95        for job_type in job_types {
96            col_id.push(job_type.id.as_str());
97            col_description.push(job_type.description.as_deref());
98            col_input_schema_json.push(job_type.input_schema_json.as_deref());
99            col_output_schema_json.push(job_type.output_schema_json.as_deref());
100        }
101
102        {
103            let stmt = tx
104                .prepare(
105                    r#"--sql
106                    INSERT INTO ora.job_type (
107                        id,
108                        description,
109                        input_schema_json,
110                        output_schema_json
111                    ) SELECT * FROM UNNEST(
112                        $1::TEXT[],
113                        $2::TEXT[],
114                        $3::TEXT[],
115                        $4::TEXT[]
116                    ) ON CONFLICT (id) DO UPDATE SET
117                        description = EXCLUDED.description,
118                        input_schema_json = EXCLUDED.input_schema_json,
119                        output_schema_json = EXCLUDED.output_schema_json
120                    WHERE
121                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124                    "#,
125                )
126                .await?;
127
128            tx.execute(
129                &stmt,
130                &[
131                    &col_id,
132                    &col_description,
133                    &col_input_schema_json,
134                    &col_output_schema_json,
135                ],
136            )
137            .await?;
138        }
139
140        tx.commit().await?;
141
142        Ok(())
143    }
144
145    async fn list_job_types(&self) -> Result<Vec<JobType>> {
146        let mut conn = self.pool.get().await?;
147
148        let tx = conn.read_only_transaction().await?;
149
150        let job_types = {
151            let stmt = tx
152                .prepare(
153                    r#"--sql
154                    SELECT
155                        id,
156                        description,
157                        input_schema_json,
158                        output_schema_json
159                    FROM
160                        ora.job_type
161                    ORDER BY id
162                    "#,
163                )
164                .await?;
165
166            let rows = tx.query(&stmt, &[]).await?;
167
168            rows.into_iter()
169                .map(|row| {
170                    Result::<_>::Ok(JobType {
171                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172                        description: row.try_get(1)?,
173                        input_schema_json: row.try_get(2)?,
174                        output_schema_json: row.try_get(3)?,
175                    })
176                })
177                .collect::<Result<Vec<_>>>()?
178        };
179
180        tx.commit().await?;
181
182        Ok(job_types)
183    }
184
185    async fn add_jobs(
186        &self,
187        jobs: &[NewJob],
188        if_not_exists: Option<JobFilters>,
189    ) -> Result<AddedJobs> {
190        if jobs.is_empty() {
191            return Ok(AddedJobs::Added(Vec::new()));
192        }
193
194        let mut conn = self.pool.get().await?;
195
196        let tx = conn.transaction().await?;
197
198        if let Some(filters) = if_not_exists {
199            let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201            if !existing_job_ids.is_empty() {
202                tx.commit().await?;
203                return Ok(AddedJobs::Existing(existing_job_ids));
204            }
205        }
206
207        let mut col_id = Vec::with_capacity(jobs.len());
208        let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210        {
211            let mut col_job_type_id = Vec::with_capacity(jobs.len());
212            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217            for job in jobs {
218                col_id.push(Uuid::now_v7());
219                col_job_type_id.push(job.job.job_type_id.as_str());
220                col_target_execution_time.push(job.job.target_execution_time);
221                col_input_payload_json.push(job.job.input_payload_json.as_str());
222                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224                col_schedule_id.push(job.schedule_id.map(|s| s.0));
225            }
226
227            let stmt = tx
228                .prepare(
229                    r#"--sql
230                    INSERT INTO ora.job (
231                        id,
232                        job_type_id,
233                        target_execution_time,
234                        input_payload_json,
235                        timeout_policy_json,
236                        retry_policy_json,
237                        schedule_id
238                    ) SELECT * FROM UNNEST(
239                        $1::UUID[],
240                        $2::TEXT[],
241                        $3::TIMESTAMPTZ[],
242                        $4::TEXT[],
243                        $5::TEXT[],
244                        $6::TEXT[],
245                        $7::UUID[]
246                    )
247                    "#,
248                )
249                .await?;
250
251            tx.execute(
252                &stmt,
253                &[
254                    &col_id,
255                    &col_job_type_id,
256                    &col_target_execution_time,
257                    &col_input_payload_json,
258                    &col_timeout_policy_json,
259                    &col_retry_policy_json,
260                    &col_schedule_id,
261                ],
262            )
263            .await?;
264        }
265
266        {
267            let mut col_job_id = Vec::with_capacity(jobs.len());
268            let mut col_job_label_key = Vec::with_capacity(jobs.len());
269            let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271            for (i, job) in jobs.iter().enumerate() {
272                for label in &job.job.labels {
273                    col_job_id.push(col_id[i]);
274                    col_job_label_key.push(label.key.as_str());
275                    col_job_label_value.push(label.value.as_str());
276                }
277            }
278
279            let stmt = tx
280                .prepare(
281                    r#"--sql
282                    INSERT INTO ora.job_label (
283                        job_id,
284                        label_key,
285                        label_value
286                    ) SELECT * FROM UNNEST(
287                        $1::UUID[],
288                        $2::TEXT[],
289                        $3::TEXT[]
290                    )
291                    "#,
292                )
293                .await?;
294
295            tx.execute(
296                &stmt,
297                &[&col_job_id, &col_job_label_key, &col_job_label_value],
298            )
299            .await?;
300        }
301
302        {
303            let stmt = tx
304                .prepare(
305                    r#"--sql
306                    UPDATE ora.schedule
307                    SET
308                        active_job_id = t.job_id
309                    FROM UNNEST(
310                        $1::UUID[],
311                        $2::UUID[]
312                    ) AS t(job_id, schedule_id)
313                    WHERE
314                        ora.schedule.id = t.schedule_id
315                    "#,
316                )
317                .await?;
318
319            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320        }
321
322        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323        add_executions(&tx, &job_ids).await?;
324
325        tx.commit().await?;
326
327        Ok(AddedJobs::Added(job_ids))
328    }
329
330    async fn list_jobs(
331        &self,
332        filters: JobFilters,
333        order_by: Option<JobOrderBy>,
334        page_size: u32,
335        page_token: Option<NextPageToken>,
336    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337        let mut conn = self.pool.get().await?;
338        let tx = conn.read_only_transaction().await?;
339
340        let (jobs, next_page_token) = query::jobs::job_details(
341            &tx,
342            filters,
343            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344            page_size,
345            page_token.map(|t| t.0),
346        )
347        .await?;
348
349        tx.commit().await?;
350
351        Ok((jobs, next_page_token))
352    }
353
354    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355        let mut conn = self.pool.get().await?;
356        let tx = conn.read_only_transaction().await?;
357        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358        tx.commit().await?;
359        Ok(count)
360    }
361
362    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363        let mut conn = self.pool.get().await?;
364        let tx = conn.transaction().await?;
365        let jobs = cancel_jobs(&tx, filters).await?;
366
367        let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
368
369        mark_jobs_inactive(&tx, &job_ids).await?;
370
371        tx.commit().await?;
372
373        Ok(jobs)
374    }
375
376    async fn add_schedules(
377        &self,
378        schedules: &[ScheduleDefinition],
379        if_not_exists: Option<ScheduleFilters>,
380    ) -> Result<AddedSchedules> {
381        if schedules.is_empty() {
382            return Ok(AddedSchedules::Added(Vec::new()));
383        }
384
385        let mut conn = self.pool.get().await?;
386        let tx = conn.transaction().await?;
387
388        if let Some(filters) = if_not_exists {
389            let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
390
391            if !existing_schedule_ids.is_empty() {
392                tx.commit().await?;
393                return Ok(AddedSchedules::Existing(existing_schedule_ids));
394            }
395        }
396
397        let mut col_id = Vec::with_capacity(schedules.len());
398        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
399        let mut col_job_template_json = Vec::with_capacity(schedules.len());
400        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
401        let mut col_start_after = Vec::with_capacity(schedules.len());
402        let mut col_end_before = Vec::with_capacity(schedules.len());
403
404        for schedule in schedules {
405            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
406
407            col_id.push(Uuid::now_v7());
408            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
409            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
410            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
411            col_start_after.push(start_after);
412            col_end_before.push(end_before);
413        }
414
415        {
416            let stmt = tx
417                .prepare(
418                    r#"--sql
419                    INSERT INTO ora.schedule (
420                        id,
421                        job_template_job_type_id,
422                        job_template_json,
423                        scheduling_policy_json,
424                        start_after,
425                        end_before
426                    ) SELECT * FROM UNNEST(
427                        $1::UUID[],
428                        $2::TEXT[],
429                        $3::TEXT[],
430                        $4::TEXT[],
431                        $5::TIMESTAMPTZ[],
432                        $6::TIMESTAMPTZ[]
433                    )
434                    "#,
435                )
436                .await?;
437
438            tx.execute(
439                &stmt,
440                &[
441                    &col_id,
442                    &col_job_template_job_type_id,
443                    &col_job_template_json,
444                    &col_scheduling_policy_json,
445                    &col_start_after,
446                    &col_end_before,
447                ],
448            )
449            .await?;
450        }
451
452        tx.commit().await?;
453
454        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
455
456        Ok(AddedSchedules::Added(schedule_ids))
457    }
458
459    async fn list_schedules(
460        &self,
461        filters: ScheduleFilters,
462        order_by: Option<ScheduleOrderBy>,
463        page_size: u32,
464        page_token: Option<NextPageToken>,
465    ) -> Result<(
466        Vec<ScheduleDetails>,
467        Option<ora_backend::common::NextPageToken>,
468    )> {
469        let mut conn = self.pool.get().await?;
470        let tx = conn.read_only_transaction().await?;
471
472        let (schedules, next_page_token) = query::schedules::schedule_details(
473            &tx,
474            filters,
475            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
476            page_size,
477            page_token.map(|t| t.0),
478        )
479        .await?;
480
481        tx.commit().await?;
482
483        Ok((schedules, next_page_token))
484    }
485
486    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
487        let mut conn = self.pool.get().await?;
488        let tx = conn.read_only_transaction().await?;
489        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
490        tx.commit().await?;
491        Ok(count)
492    }
493
494    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
495        let mut conn = self.pool.get().await?;
496        let tx = conn.transaction().await?;
497        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
498        tx.commit().await?;
499        Ok(schedules)
500    }
501
502    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
503        async_stream::try_stream!({
504            let mut last_execution_id: Option<ExecutionId> = None;
505
506            loop {
507                let mut conn = self.pool.get().await?;
508                let tx = conn.read_only_transaction().await?;
509
510                let rows = if let Some(last_execution_id) = last_execution_id {
511                    let stmt = tx
512                        .prepare(
513                            r#"--sql
514                            SELECT
515                                ora.execution.id,
516                                ora.job.id,
517                                ora.job.job_type_id,
518                                ora.job.input_payload_json,
519                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
520                                ora.job.retry_policy_json,
521                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
522                            FROM
523                                ora.execution
524                            JOIN ora.job ON
525                                ora.execution.job_id = ora.job.id
526                            WHERE
527                                status = 0
528                                AND ora.job.target_execution_time <= NOW()
529                                AND ora.execution.id > $1::UUID
530                            ORDER BY
531                                ora.execution.id ASC
532                            LIMIT 1000
533                            "#,
534                        )
535                        .await?;
536
537                    tx.query(&stmt, &[&last_execution_id.0]).await?
538                } else {
539                    let stmt = tx
540                        .prepare(
541                            r#"--sql
542                            SELECT
543                                ora.execution.id,
544                                ora.job.id,
545                                ora.job.job_type_id,
546                                ora.job.input_payload_json,
547                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
548                                ora.job.retry_policy_json,
549                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
550                            FROM
551                                ora.execution
552                            JOIN ora.job ON
553                                ora.execution.job_id = ora.job.id
554                            WHERE
555                                status = 0
556                                AND ora.job.target_execution_time <= NOW()
557                            ORDER BY
558                                ora.execution.id ASC
559                            LIMIT 1000
560                            "#,
561                        )
562                        .await?;
563
564                    tx.query(&stmt, &[]).await?
565                };
566
567                tx.commit().await?;
568
569                if rows.is_empty() {
570                    break;
571                }
572
573                let mut ready_executions = Vec::with_capacity(rows.len());
574
575                for row in rows {
576                    ready_executions.push(ReadyExecution {
577                        execution_id: ExecutionId(row.try_get(0)?),
578                        job_id: JobId(row.try_get(1)?),
579                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
580                        input_payload_json: row.try_get(3)?,
581                        attempt_number: row.try_get::<_, i64>(4)? as u64,
582                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
583                        target_execution_time: UNIX_EPOCH
584                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
585                    });
586                }
587
588                last_execution_id = ready_executions.last().map(|e| e.execution_id);
589
590                yield ready_executions;
591            }
592        })
593    }
594
595    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
596        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
597
598        loop {
599            let mut conn = self.pool.get().await?;
600
601            let next_timestamp = {
602                let tx = conn.read_only_transaction().await?;
603
604                let stmt = tx
605                    .prepare(
606                        r#"--sql
607                        SELECT
608                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
609                        FROM
610                            ora.job
611                        JOIN ora.execution ON
612                            ora.execution.job_id = ora.job.id
613                        WHERE
614                            ora.execution.status = 0
615                            AND NOT (ora.execution.id = ANY($1::UUID[]))
616                        ORDER BY
617                            ora.job.target_execution_time ASC
618                        LIMIT 1
619                        "#,
620                    )
621                    .await?;
622
623                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
624
625                tx.commit().await?;
626
627                match row {
628                    Some(row) => row
629                        .try_get::<_, Option<f64>>(0)?
630                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
631                    None => None,
632                }
633            };
634
635            drop(conn);
636
637            if let Some(next_timestamp) = next_timestamp {
638                let now = std::time::SystemTime::now();
639
640                if next_timestamp <= now {
641                    break;
642                }
643
644                tokio::time::sleep(
645                    next_timestamp
646                        .duration_since(now)
647                        .unwrap_or_else(|_| Duration::from_secs(0)),
648                )
649                .await;
650                break;
651            }
652
653            tokio::time::sleep(Duration::from_millis(500)).await;
654        }
655
656        Ok(())
657    }
658
659    fn in_progress_executions(
660        &self,
661    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
662        async_stream::try_stream!({
663            let mut last_execution_id: Option<ExecutionId> = None;
664
665            loop {
666                let mut conn = self.pool.get().await?;
667                let tx = conn.read_only_transaction().await?;
668
669                let rows = if let Some(last_execution_id) = last_execution_id {
670                    let stmt = tx
671                        .prepare(
672                            r#"--sql
673                            SELECT
674                                ora.execution.id,
675                                ora.job.id,
676                                ora.execution.executor_id,
677                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
678                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
679                                ora.job.timeout_policy_json,
680                                ora.job.retry_policy_json,
681                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
682                            FROM
683                                ora.execution
684                            JOIN ora.job ON
685                                ora.execution.job_id = ora.job.id
686                            WHERE
687                                status = 1
688                                AND ora.execution.id > $1::UUID
689                            ORDER BY
690                                ora.execution.id ASC
691                            LIMIT 1000
692                            "#,
693                        )
694                        .await?;
695
696                    tx.query(&stmt, &[&last_execution_id.0]).await?
697                } else {
698                    let stmt = tx
699                        .prepare(
700                            r#"--sql
701                            SELECT
702                                ora.execution.id,
703                                ora.job.id,
704                                ora.execution.executor_id,
705                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
706                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
707                                ora.job.timeout_policy_json,
708                                ora.job.retry_policy_json,
709                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
710                            FROM
711                                ora.execution
712                            JOIN ora.job ON
713                                ora.execution.job_id = ora.job.id
714                            WHERE
715                                status = 1
716                            ORDER BY
717                                ora.execution.id ASC
718                            LIMIT 1000
719                        "#,
720                        )
721                        .await?;
722
723                    tx.query(&stmt, &[]).await?
724                };
725
726                tx.commit().await?;
727
728                if rows.is_empty() {
729                    break;
730                }
731
732                let mut in_progress_executions = Vec::with_capacity(rows.len());
733
734                for row in rows {
735                    in_progress_executions.push(InProgressExecution {
736                        execution_id: ExecutionId(row.try_get(0)?),
737                        job_id: JobId(row.try_get(1)?),
738                        executor_id: ExecutorId(row.try_get(2)?),
739                        target_execution_time: UNIX_EPOCH
740                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
741                        started_at: UNIX_EPOCH
742                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
743                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
744                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
745                        attempt_number: row.try_get::<_, i64>(7)? as u64,
746                    });
747                }
748
749                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
750
751                yield in_progress_executions;
752            }
753        })
754    }
755
756    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
757        if executions.is_empty() {
758            return Ok(());
759        }
760
761        let mut conn = self.pool.get().await?;
762
763        let tx = conn.transaction().await?;
764
765        let stmt = tx
766            .prepare(
767                r#"--sql
768                UPDATE ora.execution
769                SET
770                    executor_id = t.executor_id,
771                    started_at = to_timestamp(t.started_at)
772                FROM UNNEST(
773                    $1::UUID[],
774                    $2::UUID[],
775                    $3::DOUBLE PRECISION[]
776                ) AS t(execution_id, executor_id, started_at)
777                WHERE
778                    execution_id = id
779                    AND ora.execution.started_at IS NULL
780                "#,
781            )
782            .await?;
783
784        let mut col_execution_id = Vec::with_capacity(executions.len());
785        let mut col_executor_id = Vec::with_capacity(executions.len());
786        let mut col_started_at = Vec::with_capacity(executions.len());
787
788        for execution in executions {
789            col_execution_id.push(execution.execution_id.0);
790            col_executor_id.push(execution.executor_id.0);
791            col_started_at.push(
792                execution
793                    .started_at
794                    .duration_since(UNIX_EPOCH)
795                    .unwrap_or_default()
796                    .as_secs_f64(),
797            );
798        }
799
800        tx.execute(
801            &stmt,
802            &[&col_execution_id, &col_executor_id, &col_started_at],
803        )
804        .await?;
805
806        tx.commit().await?;
807
808        Ok(())
809    }
810
811    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
812        if executions.is_empty() {
813            return Ok(());
814        }
815
816        let mut conn = self.pool.get().await?;
817
818        let tx = conn.transaction().await?;
819
820        let stmt = tx
821            .prepare(
822                r#"--sql
823                UPDATE ora.execution
824                SET
825                    succeeded_at = to_timestamp(t.succeeded_at),
826                    output_json = t.output_json
827                FROM UNNEST(
828                    $1::UUID[],
829                    $2::DOUBLE PRECISION[],
830                    $3::TEXT[]
831                ) AS t(execution_id, succeeded_at, output_json)
832                WHERE
833                    execution_id = id
834                    AND status < 2
835                RETURNING job_id
836                "#,
837            )
838            .await?;
839
840        let mut col_execution_id = Vec::with_capacity(executions.len());
841        let mut col_succeeded_at = Vec::with_capacity(executions.len());
842        let mut col_output_json = Vec::with_capacity(executions.len());
843
844        for execution in executions {
845            col_execution_id.push(execution.execution_id.0);
846            col_succeeded_at.push(
847                execution
848                    .succeeded_at
849                    .duration_since(UNIX_EPOCH)
850                    .unwrap_or_default()
851                    .as_secs_f64(),
852            );
853            col_output_json.push(execution.output_json.as_str());
854        }
855
856        let rows = tx
857            .query(
858                &stmt,
859                &[&col_execution_id, &col_succeeded_at, &col_output_json],
860            )
861            .await?;
862
863        let mut job_ids = Vec::with_capacity(rows.len());
864        for row in rows {
865            job_ids.push(JobId(row.try_get(0)?));
866        }
867
868        mark_jobs_inactive(&tx, &job_ids).await?;
869
870        tx.commit().await?;
871
872        Ok(())
873    }
874
875    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
876        if executions.is_empty() {
877            return Ok(());
878        }
879
880        let mut conn = self.pool.get().await?;
881        let tx = conn.transaction().await?;
882
883        let jobs = executions_failed(&tx, executions).await?;
884
885        mark_jobs_inactive(&tx, &jobs).await?;
886
887        tx.commit().await?;
888
889        Ok(())
890    }
891
892    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
893        if executions.is_empty() {
894            return Ok(());
895        }
896
897        let mut conn = self.pool.get().await?;
898        let tx = conn.transaction().await?;
899
900        let job_ids = executions_failed(&tx, executions).await?;
901        add_executions(&tx, &job_ids).await?;
902
903        tx.commit().await?;
904
905        Ok(())
906    }
907
908    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
909        async_stream::try_stream!({
910            let mut last_schedule_id: Option<ScheduleId> = None;
911            loop {
912                let mut conn = self.pool.get().await?;
913                let tx = conn.read_only_transaction().await?;
914
915                let rows = if let Some(last_schedule_id) = last_schedule_id {
916                    let stmt = tx
917                        .prepare(
918                            r#"--sql
919                            SELECT
920                                id,
921                                EXTRACT(EPOCH FROM(
922                                    SELECT
923                                        target_execution_time
924                                    FROM
925                                        ora.job
926                                    WHERE
927                                        ora.job.schedule_id = ora.schedule.id
928                                    ORDER BY ora.job.id DESC
929                                    LIMIT 1
930                                ))::DOUBLE PRECISION,
931                                scheduling_policy_json,
932                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
933                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
934                                job_template_json
935                            FROM
936                                ora.schedule
937                            WHERE
938                                (
939                                    ora.schedule.stopped_at IS NULL
940                                    AND ora.schedule.active_job_id IS NULL
941                                )
942                                AND id > $1::UUID
943                            ORDER BY id ASC
944                            LIMIT 1000
945                            "#,
946                        )
947                        .await?;
948
949                    tx.query(&stmt, &[&last_schedule_id.0]).await?
950                } else {
951                    let stmt = tx
952                        .prepare(
953                            r#"--sql
954                            SELECT
955                                id,
956                                EXTRACT(EPOCH FROM(
957                                    SELECT
958                                        target_execution_time
959                                    FROM
960                                        ora.job
961                                    WHERE
962                                        ora.job.schedule_id = ora.schedule.id
963                                    ORDER BY ora.job.id DESC
964                                    LIMIT 1
965                                ))::DOUBLE PRECISION,
966                                scheduling_policy_json,
967                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
968                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
969                                job_template_json
970                            FROM
971                                ora.schedule
972                            WHERE
973                                ora.schedule.stopped_at IS NULL
974                                AND ora.schedule.active_job_id IS NULL
975                            ORDER BY id ASC
976                            LIMIT 1000
977                            "#,
978                        )
979                        .await?;
980
981                    tx.query(&stmt, &[]).await?
982                };
983
984                tx.commit().await?;
985
986                if rows.is_empty() {
987                    break;
988                }
989
990                let mut schedules = Vec::with_capacity(rows.len());
991
992                for row in rows {
993                    schedules.push(PendingSchedule {
994                        schedule_id: ScheduleId(row.try_get(0)?),
995                        last_target_execution_time: row
996                            .try_get::<_, Option<f64>>(1)?
997                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
998                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
999                        time_range: TimeRange {
1000                            start: row
1001                                .try_get::<_, Option<f64>>(3)?
1002                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1003                            end: row
1004                                .try_get::<_, Option<f64>>(4)?
1005                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1006                        },
1007                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1008                    });
1009                }
1010
1011                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1012                yield schedules;
1013            }
1014        })
1015    }
1016
1017    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1018        let mut conn = self.pool.get().await?;
1019
1020        // delete in a loop so that we don't lock
1021        // tables for too long
1022        loop {
1023            let mut deleted_rows = 0;
1024
1025            {
1026                let tx = conn.transaction().await?;
1027
1028                let stmt = tx
1029                    .prepare(
1030                        r#"--sql
1031                        DELETE FROM ora.job
1032                        WHERE id IN (
1033                            SELECT ora.job.id
1034                            FROM ora.job
1035                            JOIN LATERAL (
1036                                SELECT
1037                                    status,
1038                                    succeeded_at,
1039                                    failed_at,
1040                                    cancelled_at
1041                                FROM
1042                                    ora.execution
1043                                WHERE
1044                                    ora.execution.job_id = ora.job.id
1045                                ORDER BY ora.execution.id DESC
1046                                LIMIT 1
1047                            ) e ON TRUE
1048                            WHERE
1049                                ora.job.inactive
1050                                AND (
1051                                    e.succeeded_at < to_timestamp($1)
1052                                    OR e.failed_at < to_timestamp($1)
1053                                    OR e.cancelled_at < to_timestamp($1)
1054                                )
1055                            LIMIT 25
1056                            FOR UPDATE SKIP LOCKED
1057                        );
1058                        "#,
1059                    )
1060                    .await?;
1061
1062                let deleted = tx
1063                    .execute(
1064                        &stmt,
1065                        &[&before
1066                            .duration_since(UNIX_EPOCH)
1067                            .unwrap_or_default()
1068                            .as_secs_f64()],
1069                    )
1070                    .await?;
1071
1072                deleted_rows += deleted;
1073
1074                tx.commit().await?;
1075            }
1076
1077            {
1078                let tx = conn.transaction().await?;
1079                let stmt = tx
1080                    .prepare(
1081                        r#"--sql
1082                        DELETE FROM ONLY ora.schedule
1083                        WHERE ctid IN (
1084                            SELECT ctid
1085                            FROM ora.schedule
1086                            WHERE 
1087                                ora.schedule.stopped_at < to_timestamp($1)
1088                            LIMIT 25
1089                            FOR UPDATE SKIP LOCKED
1090                        )
1091                        "#,
1092                    )
1093                    .await?;
1094
1095                let deleted = tx
1096                    .execute(
1097                        &stmt,
1098                        &[&before
1099                            .duration_since(UNIX_EPOCH)
1100                            .unwrap_or_default()
1101                            .as_secs_f64()],
1102                    )
1103                    .await?;
1104
1105                tx.commit().await?;
1106
1107                deleted_rows += deleted;
1108            }
1109
1110            {
1111                let tx = conn.transaction().await?;
1112
1113                let stmt = tx
1114                    .prepare(
1115                        r#"--sql
1116                        DELETE FROM ora.job_type
1117                        WHERE
1118                            NOT EXISTS (
1119                                SELECT FROM ora.job
1120                                WHERE
1121                                    ora.job.job_type_id = ora.job_type.id
1122                            )
1123                            AND NOT EXISTS (
1124                                SELECT FROM ora.schedule
1125                                WHERE
1126                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1127                            )
1128                        "#,
1129                    )
1130                    .await?;
1131
1132                tx.execute(&stmt, &[]).await?;
1133
1134                tx.commit().await?;
1135            }
1136
1137            if deleted_rows == 0 {
1138                break;
1139            }
1140        }
1141
1142        Ok(())
1143    }
1144
1145    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1146        loop {
1147            let mut conn = self.pool.get().await?;
1148
1149            let has_pending = {
1150                let tx = conn.read_only_transaction().await?;
1151
1152                let stmt = tx
1153                    .prepare(
1154                        r#"--sql
1155                        SELECT 1 FROM ora.schedule
1156                        WHERE
1157                            ora.schedule.stopped_at IS NULL
1158                            AND ora.schedule.active_job_id IS NULL
1159                        LIMIT 1
1160                        "#,
1161                    )
1162                    .await?;
1163
1164                let row = tx.query_opt(&stmt, &[]).await?;
1165
1166                tx.commit().await?;
1167
1168                row.is_some()
1169            };
1170
1171            drop(conn);
1172
1173            if has_pending {
1174                break;
1175            }
1176
1177            tokio::time::sleep(Duration::from_millis(500)).await;
1178        }
1179
1180        Ok(())
1181    }
1182}
1183
1184async fn executions_failed(
1185    tx: &DbTransaction<'_>,
1186    executions: &[FailedExecution],
1187) -> Result<Vec<JobId>> {
1188    let stmt = tx
1189        .prepare(
1190            r#"--sql
1191            UPDATE ora.execution
1192            SET
1193                failed_at = to_timestamp(t.failed_at),
1194                failure_reason = t.failure_reason
1195            FROM UNNEST(
1196                $1::UUID[],
1197                $2::DOUBLE PRECISION[],
1198                $3::TEXT[]
1199            ) AS t(execution_id, failed_at, failure_reason)
1200            WHERE
1201                execution_id = id
1202                AND status < 2
1203            RETURNING job_id;
1204            "#,
1205        )
1206        .await?;
1207
1208    let mut col_execution_id = Vec::with_capacity(executions.len());
1209    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1210    let mut col_failure_reason = Vec::with_capacity(executions.len());
1211
1212    for execution in executions {
1213        col_execution_id.push(execution.execution_id.0);
1214        col_succeeded_at.push(
1215            execution
1216                .failed_at
1217                .duration_since(UNIX_EPOCH)
1218                .unwrap_or_default()
1219                .as_secs_f64(),
1220        );
1221        col_failure_reason.push(execution.failure_reason.as_str());
1222    }
1223
1224    let rows = tx
1225        .query(
1226            &stmt,
1227            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1228        )
1229        .await?;
1230
1231    let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1232
1233    Ok(job_ids)
1234}
1235
1236async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1237    if jobs.is_empty() {
1238        return Ok(());
1239    }
1240
1241    let mut col_job_id = Vec::with_capacity(jobs.len());
1242    let mut col_execution_id = Vec::with_capacity(jobs.len());
1243
1244    for job in jobs {
1245        col_job_id.push(job.0);
1246        col_execution_id.push(Uuid::now_v7());
1247    }
1248
1249    let stmt = tx
1250        .prepare(
1251            r#"--sql
1252            INSERT INTO ora.execution (
1253                id,
1254                job_id
1255            ) SELECT * FROM UNNEST(
1256                $1::UUID[],
1257                $2::UUID[]
1258            )
1259            "#,
1260        )
1261        .await?;
1262
1263    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1264
1265    Ok(())
1266}
1267
1268async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1269    if jobs.is_empty() {
1270        return Ok(());
1271    }
1272
1273    let mut col_job_id = Vec::with_capacity(jobs.len());
1274
1275    for job in jobs {
1276        col_job_id.push(job.0);
1277    }
1278
1279    {
1280        let stmt = tx
1281            .prepare(
1282                r#"--sql
1283                UPDATE ora.job
1284                SET inactive = TRUE
1285                WHERE id = ANY($1::UUID[])
1286                "#,
1287            )
1288            .await?;
1289
1290        tx.execute(&stmt, &[&col_job_id]).await?;
1291    }
1292
1293    {
1294        let stmt = tx
1295            .prepare(
1296                r#"--sql
1297                UPDATE ora.schedule
1298                SET
1299                    active_job_id = NULL
1300                WHERE active_job_id = ANY($1::UUID[])
1301                "#,
1302            )
1303            .await?;
1304
1305        tx.execute(&stmt, &[&col_job_id]).await?;
1306    }
1307
1308    Ok(())
1309}