Skip to main content

ora_backend_postgres/
lib.rs

1#![allow(missing_docs)]
2
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8    Backend,
9    common::{NextPageToken, TimeRange},
10    executions::{
11        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12        SucceededExecution,
13    },
14    executors::ExecutorId,
15    jobs::{
16        AddedJobs, CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId,
17        NewJob,
18    },
19    schedules::{
20        AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
21        ScheduleId, ScheduleOrderBy, StoppedSchedule,
22    },
23};
24
25use refinery::embed_migrations;
26use thiserror::Error;
27use uuid::Uuid;
28
29use crate::{
30    db::{DbPool, DbTransaction},
31    query::{
32        jobs::{cancel_jobs, job_count},
33        schedules::schedule_count,
34    },
35};
36
37embed_migrations!("./migrations");
38
39mod db;
40mod models;
41mod query;
42
43pub struct PostgresBackend {
44    pool: DbPool,
45}
46
47type Result<T> = core::result::Result<T, Error>;
48
49/// Errors that can occur when using the Postgres backend.
50#[derive(Error, Debug)]
51pub enum Error {
52    #[error("{0}")]
53    Migrations(#[from] refinery::Error),
54    #[error("{0}")]
55    Pool(#[from] PoolError),
56    #[error("{0}")]
57    Postgres(#[from] tokio_postgres::Error),
58    #[error("{0}")]
59    Serde(#[from] serde_json::Error),
60    #[error("invalid page token: {0}")]
61    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
62}
63
64impl PostgresBackend {
65    /// Create and initialize a new Postgres backend
66    /// with the given connection pool.
67    pub async fn new(pool: Pool) -> Result<Self> {
68        let mut conn = pool.get().await?;
69        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
70        migrations::runner()
71            .set_migration_table_name("ora.migrations")
72            .run_async(&mut **conn)
73            .await?;
74        Ok(Self { pool: DbPool(pool) })
75    }
76}
77
78impl Backend for PostgresBackend {
79    type Error = Error;
80
81    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
82        if job_types.is_empty() {
83            return Ok(());
84        }
85
86        let mut conn = self.pool.get().await?;
87
88        let tx = conn.transaction().await?;
89
90        let mut col_id = Vec::with_capacity(job_types.len());
91        let mut col_description = Vec::with_capacity(job_types.len());
92        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
93        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
94
95        for job_type in job_types {
96            col_id.push(job_type.id.as_str());
97            col_description.push(job_type.description.as_deref());
98            col_input_schema_json.push(job_type.input_schema_json.as_deref());
99            col_output_schema_json.push(job_type.output_schema_json.as_deref());
100        }
101
102        {
103            let stmt = tx
104                .prepare(
105                    r#"--sql
106                    INSERT INTO ora.job_type (
107                        id,
108                        description,
109                        input_schema_json,
110                        output_schema_json
111                    ) SELECT * FROM UNNEST(
112                        $1::TEXT[],
113                        $2::TEXT[],
114                        $3::TEXT[],
115                        $4::TEXT[]
116                    ) ON CONFLICT (id) DO UPDATE SET
117                        description = EXCLUDED.description,
118                        input_schema_json = EXCLUDED.input_schema_json,
119                        output_schema_json = EXCLUDED.output_schema_json
120                    WHERE
121                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
122                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
123                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
124                    "#,
125                )
126                .await?;
127
128            tx.execute(
129                &stmt,
130                &[
131                    &col_id,
132                    &col_description,
133                    &col_input_schema_json,
134                    &col_output_schema_json,
135                ],
136            )
137            .await?;
138        }
139
140        tx.commit().await?;
141
142        Ok(())
143    }
144
145    async fn list_job_types(&self) -> Result<Vec<JobType>> {
146        let mut conn = self.pool.get().await?;
147
148        let tx = conn.read_only_transaction().await?;
149
150        let job_types = {
151            let stmt = tx
152                .prepare(
153                    r#"--sql
154                    SELECT
155                        id,
156                        description,
157                        input_schema_json,
158                        output_schema_json
159                    FROM
160                        ora.job_type
161                    ORDER BY id
162                    "#,
163                )
164                .await?;
165
166            let rows = tx.query(&stmt, &[]).await?;
167
168            rows.into_iter()
169                .map(|row| {
170                    Result::<_>::Ok(JobType {
171                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
172                        description: row.try_get(1)?,
173                        input_schema_json: row.try_get(2)?,
174                        output_schema_json: row.try_get(3)?,
175                    })
176                })
177                .collect::<Result<Vec<_>>>()?
178        };
179
180        tx.commit().await?;
181
182        Ok(job_types)
183    }
184
185    async fn add_jobs(
186        &self,
187        jobs: &[NewJob],
188        if_not_exists: Option<JobFilters>,
189    ) -> Result<AddedJobs> {
190        if jobs.is_empty() {
191            return Ok(AddedJobs::Added(Vec::new()));
192        }
193
194        let mut conn = self.pool.get().await?;
195
196        let tx = conn.transaction().await?;
197
198        if let Some(filters) = if_not_exists {
199            let existing_job_ids = query::jobs::job_ids(&tx, filters).await?;
200
201            if !existing_job_ids.is_empty() {
202                tx.commit().await?;
203                return Ok(AddedJobs::Existing(existing_job_ids));
204            }
205        }
206
207        let mut col_id = Vec::with_capacity(jobs.len());
208        let mut col_schedule_id = Vec::with_capacity(jobs.len());
209
210        {
211            let mut col_job_type_id = Vec::with_capacity(jobs.len());
212            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
213            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
214            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
215            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
216
217            for job in jobs {
218                col_id.push(Uuid::now_v7());
219                col_job_type_id.push(job.job.job_type_id.as_str());
220                col_target_execution_time.push(job.job.target_execution_time);
221                col_input_payload_json.push(job.job.input_payload_json.as_str());
222                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
223                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
224                col_schedule_id.push(job.schedule_id.map(|s| s.0));
225            }
226
227            let stmt = tx
228                .prepare(
229                    r#"--sql
230                    INSERT INTO ora.job (
231                        id,
232                        job_type_id,
233                        target_execution_time,
234                        input_payload_json,
235                        timeout_policy_json,
236                        retry_policy_json,
237                        schedule_id
238                    ) SELECT * FROM UNNEST(
239                        $1::UUID[],
240                        $2::TEXT[],
241                        $3::TIMESTAMPTZ[],
242                        $4::TEXT[],
243                        $5::TEXT[],
244                        $6::TEXT[],
245                        $7::UUID[]
246                    )
247                    "#,
248                )
249                .await?;
250
251            tx.execute(
252                &stmt,
253                &[
254                    &col_id,
255                    &col_job_type_id,
256                    &col_target_execution_time,
257                    &col_input_payload_json,
258                    &col_timeout_policy_json,
259                    &col_retry_policy_json,
260                    &col_schedule_id,
261                ],
262            )
263            .await?;
264        }
265
266        {
267            let mut col_job_id = Vec::with_capacity(jobs.len());
268            let mut col_job_label_key = Vec::with_capacity(jobs.len());
269            let mut col_job_label_value = Vec::with_capacity(jobs.len());
270
271            for (i, job) in jobs.iter().enumerate() {
272                for label in &job.job.labels {
273                    col_job_id.push(col_id[i]);
274                    col_job_label_key.push(label.key.as_str());
275                    col_job_label_value.push(label.value.as_str());
276                }
277            }
278
279            let stmt = tx
280                .prepare(
281                    r#"--sql
282                    INSERT INTO ora.job_label (
283                        job_id,
284                        label_key,
285                        label_value
286                    ) SELECT * FROM UNNEST(
287                        $1::UUID[],
288                        $2::TEXT[],
289                        $3::TEXT[]
290                    )
291                    "#,
292                )
293                .await?;
294
295            tx.execute(
296                &stmt,
297                &[&col_job_id, &col_job_label_key, &col_job_label_value],
298            )
299            .await?;
300        }
301
302        {
303            let stmt = tx
304                .prepare(
305                    r#"--sql
306                    UPDATE ora.schedule
307                    SET
308                        active_job_id = t.job_id
309                    FROM UNNEST(
310                        $1::UUID[],
311                        $2::UUID[]
312                    ) AS t(job_id, schedule_id)
313                    WHERE
314                        ora.schedule.id = t.schedule_id
315                    "#,
316                )
317                .await?;
318
319            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
320        }
321
322        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
323        add_executions(&tx, &job_ids).await?;
324
325        tx.commit().await?;
326
327        Ok(AddedJobs::Added(job_ids))
328    }
329
330    async fn list_jobs(
331        &self,
332        filters: JobFilters,
333        order_by: Option<JobOrderBy>,
334        page_size: u32,
335        page_token: Option<NextPageToken>,
336    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
337        let mut conn = self.pool.get().await?;
338        let tx = conn.read_only_transaction().await?;
339
340        let (jobs, next_page_token) = query::jobs::job_details(
341            &tx,
342            filters,
343            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
344            page_size,
345            page_token.map(|t| t.0),
346        )
347        .await?;
348
349        tx.commit().await?;
350
351        Ok((jobs, next_page_token))
352    }
353
354    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
355        let mut conn = self.pool.get().await?;
356        let tx = conn.read_only_transaction().await?;
357        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
358        tx.commit().await?;
359        Ok(count)
360    }
361
362    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
363        let mut conn = self.pool.get().await?;
364        let tx = conn.transaction().await?;
365        let now = std::time::SystemTime::now();
366
367        let jobs = cancel_jobs(&tx, filters).await?;
368        mark_jobs_inactive(&tx, &jobs.iter().map(|j| (j.job_id, now)).collect::<Vec<_>>()).await?;
369        tx.commit().await?;
370
371        Ok(jobs)
372    }
373
374    async fn add_schedules(
375        &self,
376        schedules: &[ScheduleDefinition],
377        if_not_exists: Option<ScheduleFilters>,
378    ) -> Result<AddedSchedules> {
379        if schedules.is_empty() {
380            return Ok(AddedSchedules::Added(Vec::new()));
381        }
382
383        let mut conn = self.pool.get().await?;
384        let tx = conn.transaction().await?;
385
386        if let Some(filters) = if_not_exists {
387            let existing_schedule_ids = query::schedules::schedule_ids(&tx, filters).await?;
388
389            if !existing_schedule_ids.is_empty() {
390                tx.commit().await?;
391                return Ok(AddedSchedules::Existing(existing_schedule_ids));
392            }
393        }
394
395        let mut col_id = Vec::with_capacity(schedules.len());
396        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
397        let mut col_job_template_json = Vec::with_capacity(schedules.len());
398        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
399        let mut col_start_after = Vec::with_capacity(schedules.len());
400        let mut col_end_before = Vec::with_capacity(schedules.len());
401
402        for schedule in schedules {
403            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
404
405            col_id.push(Uuid::now_v7());
406            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
407            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
408            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
409            col_start_after.push(start_after);
410            col_end_before.push(end_before);
411        }
412
413        {
414            let stmt = tx
415                .prepare(
416                    r#"--sql
417                    INSERT INTO ora.schedule (
418                        id,
419                        job_template_job_type_id,
420                        job_template_json,
421                        scheduling_policy_json,
422                        start_after,
423                        end_before
424                    ) SELECT * FROM UNNEST(
425                        $1::UUID[],
426                        $2::TEXT[],
427                        $3::TEXT[],
428                        $4::TEXT[],
429                        $5::TIMESTAMPTZ[],
430                        $6::TIMESTAMPTZ[]
431                    )
432                    "#,
433                )
434                .await?;
435
436            tx.execute(
437                &stmt,
438                &[
439                    &col_id,
440                    &col_job_template_job_type_id,
441                    &col_job_template_json,
442                    &col_scheduling_policy_json,
443                    &col_start_after,
444                    &col_end_before,
445                ],
446            )
447            .await?;
448        }
449
450        {
451            let mut col_schedule_id = Vec::with_capacity(schedules.len());
452            let mut col_schedule_label_key = Vec::with_capacity(schedules.len());
453            let mut col_schedule_label_value = Vec::with_capacity(schedules.len());
454
455            for (i, schedule) in schedules.iter().enumerate() {
456                for label in &schedule.labels {
457                    col_schedule_id.push(col_id[i]);
458                    col_schedule_label_key.push(label.key.as_str());
459                    col_schedule_label_value.push(label.value.as_str());
460                }
461            }
462
463            let stmt = tx
464                .prepare(
465                    r#"--sql
466                    INSERT INTO ora.schedule_label (
467                        schedule_id,
468                        label_key,
469                        label_value
470                    ) SELECT * FROM UNNEST(
471                        $1::UUID[],
472                        $2::TEXT[],
473                        $3::TEXT[]
474                    )
475                    "#,
476                )
477                .await?;
478
479            tx.execute(
480                &stmt,
481                &[
482                    &col_schedule_id,
483                    &col_schedule_label_key,
484                    &col_schedule_label_value,
485                ],
486            )
487            .await?;
488        }
489
490        tx.commit().await?;
491
492        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
493
494        Ok(AddedSchedules::Added(schedule_ids))
495    }
496
497    async fn list_schedules(
498        &self,
499        filters: ScheduleFilters,
500        order_by: Option<ScheduleOrderBy>,
501        page_size: u32,
502        page_token: Option<NextPageToken>,
503    ) -> Result<(
504        Vec<ScheduleDetails>,
505        Option<ora_backend::common::NextPageToken>,
506    )> {
507        let mut conn = self.pool.get().await?;
508        let tx = conn.read_only_transaction().await?;
509
510        let (schedules, next_page_token) = query::schedules::schedule_details(
511            &tx,
512            filters,
513            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
514            page_size,
515            page_token.map(|t| t.0),
516        )
517        .await?;
518
519        tx.commit().await?;
520
521        Ok((schedules, next_page_token))
522    }
523
524    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
525        let mut conn = self.pool.get().await?;
526        let tx = conn.read_only_transaction().await?;
527        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
528        tx.commit().await?;
529        Ok(count)
530    }
531
532    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
533        let mut conn = self.pool.get().await?;
534        let tx = conn.transaction().await?;
535        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
536        tx.commit().await?;
537        Ok(schedules)
538    }
539
540    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
541        async_stream::try_stream!({
542            let mut last_execution_id: Option<ExecutionId> = None;
543
544            loop {
545                let mut conn = self.pool.get().await?;
546                let tx = conn.read_only_transaction().await?;
547
548                let rows = if let Some(last_execution_id) = last_execution_id {
549                    let stmt = tx
550                        .prepare(
551                            r#"--sql
552                            SELECT
553                                ora.execution.id,
554                                ora.job.id,
555                                ora.job.job_type_id,
556                                ora.job.input_payload_json,
557                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
558                                ora.job.retry_policy_json,
559                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
560                            FROM
561                                ora.execution
562                            JOIN ora.job ON
563                                ora.execution.job_id = ora.job.id
564                            WHERE
565                                status = 0
566                                AND ora.job.target_execution_time <= NOW()
567                                AND ora.execution.id > $1::UUID
568                            ORDER BY
569                                ora.execution.id ASC
570                            LIMIT 1000
571                            "#,
572                        )
573                        .await?;
574
575                    tx.query(&stmt, &[&last_execution_id.0]).await?
576                } else {
577                    let stmt = tx
578                        .prepare(
579                            r#"--sql
580                            SELECT
581                                ora.execution.id,
582                                ora.job.id,
583                                ora.job.job_type_id,
584                                ora.job.input_payload_json,
585                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
586                                ora.job.retry_policy_json,
587                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
588                            FROM
589                                ora.execution
590                            JOIN ora.job ON
591                                ora.execution.job_id = ora.job.id
592                            WHERE
593                                status = 0
594                                AND ora.job.target_execution_time <= NOW()
595                            ORDER BY
596                                ora.execution.id ASC
597                            LIMIT 1000
598                            "#,
599                        )
600                        .await?;
601
602                    tx.query(&stmt, &[]).await?
603                };
604
605                tx.commit().await?;
606
607                if rows.is_empty() {
608                    break;
609                }
610
611                let mut ready_executions = Vec::with_capacity(rows.len());
612
613                for row in rows {
614                    ready_executions.push(ReadyExecution {
615                        execution_id: ExecutionId(row.try_get(0)?),
616                        job_id: JobId(row.try_get(1)?),
617                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
618                        input_payload_json: row.try_get(3)?,
619                        attempt_number: row.try_get::<_, i64>(4)? as u64,
620                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
621                        target_execution_time: UNIX_EPOCH
622                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
623                    });
624                }
625
626                last_execution_id = ready_executions.last().map(|e| e.execution_id);
627
628                yield ready_executions;
629            }
630        })
631    }
632
633    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
634        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
635
636        loop {
637            let mut conn = self.pool.get().await?;
638
639            let next_timestamp = {
640                let tx = conn.read_only_transaction().await?;
641
642                let stmt = tx
643                    .prepare(
644                        r#"--sql
645                        SELECT
646                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
647                        FROM
648                            ora.job
649                        JOIN ora.execution ON
650                            ora.execution.job_id = ora.job.id
651                        WHERE
652                            ora.execution.status = 0
653                            AND NOT (ora.execution.id = ANY($1::UUID[]))
654                        ORDER BY
655                            ora.job.target_execution_time ASC
656                        LIMIT 1
657                        "#,
658                    )
659                    .await?;
660
661                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
662
663                tx.commit().await?;
664
665                match row {
666                    Some(row) => row
667                        .try_get::<_, Option<f64>>(0)?
668                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
669                    None => None,
670                }
671            };
672
673            drop(conn);
674
675            if let Some(next_timestamp) = next_timestamp {
676                let now = std::time::SystemTime::now();
677
678                if next_timestamp <= now {
679                    break;
680                }
681
682                tokio::time::sleep(
683                    next_timestamp
684                        .duration_since(now)
685                        .unwrap_or_else(|_| Duration::from_secs(0)),
686                )
687                .await;
688                break;
689            }
690
691            tokio::time::sleep(Duration::from_millis(500)).await;
692        }
693
694        Ok(())
695    }
696
697    fn in_progress_executions(
698        &self,
699    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
700        async_stream::try_stream!({
701            let mut last_execution_id: Option<ExecutionId> = None;
702
703            loop {
704                let mut conn = self.pool.get().await?;
705                let tx = conn.read_only_transaction().await?;
706
707                let rows = if let Some(last_execution_id) = last_execution_id {
708                    let stmt = tx
709                        .prepare(
710                            r#"--sql
711                            SELECT
712                                ora.execution.id,
713                                ora.job.id,
714                                ora.execution.executor_id,
715                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
716                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
717                                ora.job.timeout_policy_json,
718                                ora.job.retry_policy_json,
719                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
720                            FROM
721                                ora.execution
722                            JOIN ora.job ON
723                                ora.execution.job_id = ora.job.id
724                            WHERE
725                                status = 1
726                                AND ora.execution.id > $1::UUID
727                            ORDER BY
728                                ora.execution.id ASC
729                            LIMIT 1000
730                            "#,
731                        )
732                        .await?;
733
734                    tx.query(&stmt, &[&last_execution_id.0]).await?
735                } else {
736                    let stmt = tx
737                        .prepare(
738                            r#"--sql
739                            SELECT
740                                ora.execution.id,
741                                ora.job.id,
742                                ora.execution.executor_id,
743                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
744                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
745                                ora.job.timeout_policy_json,
746                                ora.job.retry_policy_json,
747                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
748                            FROM
749                                ora.execution
750                            JOIN ora.job ON
751                                ora.execution.job_id = ora.job.id
752                            WHERE
753                                status = 1
754                            ORDER BY
755                                ora.execution.id ASC
756                            LIMIT 1000
757                        "#,
758                        )
759                        .await?;
760
761                    tx.query(&stmt, &[]).await?
762                };
763
764                tx.commit().await?;
765
766                if rows.is_empty() {
767                    break;
768                }
769
770                let mut in_progress_executions = Vec::with_capacity(rows.len());
771
772                for row in rows {
773                    in_progress_executions.push(InProgressExecution {
774                        execution_id: ExecutionId(row.try_get(0)?),
775                        job_id: JobId(row.try_get(1)?),
776                        executor_id: ExecutorId(row.try_get(2)?),
777                        target_execution_time: UNIX_EPOCH
778                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
779                        started_at: UNIX_EPOCH
780                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
781                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
782                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
783                        attempt_number: row.try_get::<_, i64>(7)? as u64,
784                    });
785                }
786
787                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
788
789                yield in_progress_executions;
790            }
791        })
792    }
793
794    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
795        if executions.is_empty() {
796            return Ok(());
797        }
798
799        let mut conn = self.pool.get().await?;
800
801        let tx = conn.transaction().await?;
802
803        let stmt = tx
804            .prepare(
805                r#"--sql
806                UPDATE ora.execution
807                SET
808                    executor_id = t.executor_id,
809                    started_at = to_timestamp(t.started_at)
810                FROM UNNEST(
811                    $1::UUID[],
812                    $2::UUID[],
813                    $3::DOUBLE PRECISION[]
814                ) AS t(execution_id, executor_id, started_at)
815                WHERE
816                    execution_id = id
817                    AND ora.execution.started_at IS NULL
818                "#,
819            )
820            .await?;
821
822        let mut col_execution_id = Vec::with_capacity(executions.len());
823        let mut col_executor_id = Vec::with_capacity(executions.len());
824        let mut col_started_at = Vec::with_capacity(executions.len());
825
826        for execution in executions {
827            col_execution_id.push(execution.execution_id.0);
828            col_executor_id.push(execution.executor_id.0);
829            col_started_at.push(
830                execution
831                    .started_at
832                    .duration_since(UNIX_EPOCH)
833                    .unwrap_or_default()
834                    .as_secs_f64(),
835            );
836        }
837
838        tx.execute(
839            &stmt,
840            &[&col_execution_id, &col_executor_id, &col_started_at],
841        )
842        .await?;
843
844        tx.commit().await?;
845
846        Ok(())
847    }
848
849    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
850        if executions.is_empty() {
851            return Ok(());
852        }
853
854        let mut conn = self.pool.get().await?;
855
856        let tx = conn.transaction().await?;
857
858        let stmt = tx
859            .prepare(
860                r#"--sql
861                UPDATE ora.execution
862                SET
863                    succeeded_at = to_timestamp(t.succeeded_at),
864                    output_json = t.output_json
865                FROM UNNEST(
866                    $1::UUID[],
867                    $2::DOUBLE PRECISION[],
868                    $3::TEXT[]
869                ) AS t(execution_id, succeeded_at, output_json)
870                WHERE
871                    execution_id = id
872                    AND status < 2
873                RETURNING job_id, t.succeeded_at
874                "#,
875            )
876            .await?;
877
878        let mut col_execution_id = Vec::with_capacity(executions.len());
879        let mut col_succeeded_at = Vec::with_capacity(executions.len());
880        let mut col_output_json = Vec::with_capacity(executions.len());
881
882        for execution in executions {
883            col_execution_id.push(execution.execution_id.0);
884            col_succeeded_at.push(
885                execution
886                    .succeeded_at
887                    .duration_since(UNIX_EPOCH)
888                    .unwrap_or_default()
889                    .as_secs_f64(),
890            );
891            col_output_json.push(execution.output_json.as_str());
892        }
893
894        let rows = tx
895            .query(
896                &stmt,
897                &[&col_execution_id, &col_succeeded_at, &col_output_json],
898            )
899            .await?;
900
901        let mut jobs = Vec::with_capacity(rows.len());
902        for row in rows {
903            jobs.push((
904                JobId(row.try_get(0)?),
905                UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
906            ));
907        }
908
909        mark_jobs_inactive(&tx, &jobs).await?;
910
911        tx.commit().await?;
912
913        Ok(())
914    }
915
916    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
917        if executions.is_empty() {
918            return Ok(());
919        }
920
921        let mut conn = self.pool.get().await?;
922        let tx = conn.transaction().await?;
923
924        let jobs = executions_failed(&tx, executions).await?;
925
926        mark_jobs_inactive(&tx, &jobs).await?;
927
928        tx.commit().await?;
929
930        Ok(())
931    }
932
933    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
934        if executions.is_empty() {
935            return Ok(());
936        }
937
938        let mut conn = self.pool.get().await?;
939        let tx = conn.transaction().await?;
940
941        let jobs = executions_failed(&tx, executions).await?;
942        add_executions(
943            &tx,
944            &jobs.into_iter().map(|(id, ..)| id).collect::<Vec<_>>(),
945        )
946        .await?;
947
948        tx.commit().await?;
949
950        Ok(())
951    }
952
953    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
954        async_stream::try_stream!({
955            let mut last_schedule_id: Option<ScheduleId> = None;
956            loop {
957                let mut conn = self.pool.get().await?;
958                let tx = conn.read_only_transaction().await?;
959
960                let rows = if let Some(last_schedule_id) = last_schedule_id {
961                    let stmt = tx
962                        .prepare(
963                            r#"--sql
964                            SELECT
965                                id,
966                                EXTRACT(EPOCH FROM(
967                                    SELECT
968                                        target_execution_time
969                                    FROM
970                                        ora.job
971                                    WHERE
972                                        ora.job.schedule_id = ora.schedule.id
973                                    ORDER BY ora.job.id DESC
974                                    LIMIT 1
975                                ))::DOUBLE PRECISION,
976                                scheduling_policy_json,
977                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
978                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
979                                job_template_json
980                            FROM
981                                ora.schedule
982                            WHERE
983                                (
984                                    ora.schedule.stopped_at IS NULL
985                                    AND ora.schedule.active_job_id IS NULL
986                                )
987                                AND id > $1::UUID
988                            ORDER BY id ASC
989                            LIMIT 1000
990                            "#,
991                        )
992                        .await?;
993
994                    tx.query(&stmt, &[&last_schedule_id.0]).await?
995                } else {
996                    let stmt = tx
997                        .prepare(
998                            r#"--sql
999                            SELECT
1000                                id,
1001                                EXTRACT(EPOCH FROM(
1002                                    SELECT
1003                                        target_execution_time
1004                                    FROM
1005                                        ora.job
1006                                    WHERE
1007                                        ora.job.schedule_id = ora.schedule.id
1008                                    ORDER BY ora.job.id DESC
1009                                    LIMIT 1
1010                                ))::DOUBLE PRECISION,
1011                                scheduling_policy_json,
1012                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
1013                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
1014                                job_template_json
1015                            FROM
1016                                ora.schedule
1017                            WHERE
1018                                ora.schedule.stopped_at IS NULL
1019                                AND ora.schedule.active_job_id IS NULL
1020                            ORDER BY id ASC
1021                            LIMIT 1000
1022                            "#,
1023                        )
1024                        .await?;
1025
1026                    tx.query(&stmt, &[]).await?
1027                };
1028
1029                tx.commit().await?;
1030
1031                if rows.is_empty() {
1032                    break;
1033                }
1034
1035                let mut schedules = Vec::with_capacity(rows.len());
1036
1037                for row in rows {
1038                    schedules.push(PendingSchedule {
1039                        schedule_id: ScheduleId(row.try_get(0)?),
1040                        last_target_execution_time: row
1041                            .try_get::<_, Option<f64>>(1)?
1042                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1043                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
1044                        time_range: TimeRange {
1045                            start: row
1046                                .try_get::<_, Option<f64>>(3)?
1047                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1048                            end: row
1049                                .try_get::<_, Option<f64>>(4)?
1050                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
1051                        },
1052                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1053                    });
1054                }
1055
1056                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1057                yield schedules;
1058            }
1059        })
1060    }
1061
1062    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1063        let mut conn = self.pool.get().await?;
1064
1065        // delete in a loop so that we don't lock
1066        // tables for too long
1067        loop {
1068            let mut deleted_rows = 0;
1069
1070            {
1071                let tx = conn.transaction().await?;
1072
1073                let stmt = tx
1074                    .prepare(
1075                        r#"--sql
1076                        DELETE FROM ora.job
1077                        WHERE id IN (
1078                            SELECT ora.job.id
1079                            FROM ora.job
1080                            WHERE
1081                                ora.job.inactive_since < to_timestamp($1)
1082                            LIMIT 25
1083                            FOR UPDATE SKIP LOCKED
1084                        );
1085                        "#,
1086                    )
1087                    .await?;
1088
1089                let deleted = tx
1090                    .execute(
1091                        &stmt,
1092                        &[&before
1093                            .duration_since(UNIX_EPOCH)
1094                            .unwrap_or_default()
1095                            .as_secs_f64()],
1096                    )
1097                    .await?;
1098
1099                deleted_rows += deleted;
1100
1101                tx.commit().await?;
1102            }
1103
1104            {
1105                let tx = conn.transaction().await?;
1106                let stmt = tx
1107                    .prepare(
1108                        r#"--sql
1109                        DELETE FROM ONLY ora.schedule
1110                        WHERE ctid IN (
1111                            SELECT ctid
1112                            FROM ora.schedule
1113                            WHERE 
1114                                ora.schedule.stopped_at < to_timestamp($1)
1115                            LIMIT 25
1116                            FOR UPDATE SKIP LOCKED
1117                        )
1118                        "#,
1119                    )
1120                    .await?;
1121
1122                let deleted = tx
1123                    .execute(
1124                        &stmt,
1125                        &[&before
1126                            .duration_since(UNIX_EPOCH)
1127                            .unwrap_or_default()
1128                            .as_secs_f64()],
1129                    )
1130                    .await?;
1131
1132                tx.commit().await?;
1133
1134                deleted_rows += deleted;
1135            }
1136
1137            {
1138                let tx = conn.transaction().await?;
1139
1140                let stmt = tx
1141                    .prepare(
1142                        r#"--sql
1143                        DELETE FROM ora.job_type
1144                        WHERE
1145                            NOT EXISTS (
1146                                SELECT FROM ora.job
1147                                WHERE
1148                                    ora.job.job_type_id = ora.job_type.id
1149                            )
1150                            AND NOT EXISTS (
1151                                SELECT FROM ora.schedule
1152                                WHERE
1153                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1154                            )
1155                        "#,
1156                    )
1157                    .await?;
1158
1159                tx.execute(&stmt, &[]).await?;
1160
1161                tx.commit().await?;
1162            }
1163
1164            if deleted_rows == 0 {
1165                break;
1166            }
1167        }
1168
1169        Ok(())
1170    }
1171
1172    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1173        loop {
1174            let mut conn = self.pool.get().await?;
1175
1176            let has_pending = {
1177                let tx = conn.read_only_transaction().await?;
1178
1179                let stmt = tx
1180                    .prepare(
1181                        r#"--sql
1182                        SELECT 1 FROM ora.schedule
1183                        WHERE
1184                            ora.schedule.stopped_at IS NULL
1185                            AND ora.schedule.active_job_id IS NULL
1186                        LIMIT 1
1187                        "#,
1188                    )
1189                    .await?;
1190
1191                let row = tx.query_opt(&stmt, &[]).await?;
1192
1193                tx.commit().await?;
1194
1195                row.is_some()
1196            };
1197
1198            drop(conn);
1199
1200            if has_pending {
1201                break;
1202            }
1203
1204            tokio::time::sleep(Duration::from_millis(500)).await;
1205        }
1206
1207        Ok(())
1208    }
1209}
1210
1211async fn executions_failed(
1212    tx: &DbTransaction<'_>,
1213    executions: &[FailedExecution],
1214) -> Result<Vec<(JobId, SystemTime)>> {
1215    let stmt = tx
1216        .prepare(
1217            r#"--sql
1218            UPDATE ora.execution
1219            SET
1220                failed_at = to_timestamp(t.failed_at),
1221                failure_reason = t.failure_reason
1222            FROM UNNEST(
1223                $1::UUID[],
1224                $2::DOUBLE PRECISION[],
1225                $3::TEXT[]
1226            ) AS t(execution_id, failed_at, failure_reason)
1227            WHERE
1228                execution_id = id
1229                AND status < 2
1230            RETURNING job_id, t.failed_at
1231            "#,
1232        )
1233        .await?;
1234
1235    let mut col_execution_id = Vec::with_capacity(executions.len());
1236    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1237    let mut col_failure_reason = Vec::with_capacity(executions.len());
1238
1239    for execution in executions {
1240        col_execution_id.push(execution.execution_id.0);
1241        col_succeeded_at.push(
1242            execution
1243                .failed_at
1244                .duration_since(UNIX_EPOCH)
1245                .unwrap_or_default()
1246                .as_secs_f64(),
1247        );
1248        col_failure_reason.push(execution.failure_reason.as_str());
1249    }
1250
1251    let rows = tx
1252        .query(
1253            &stmt,
1254            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1255        )
1256        .await?;
1257
1258    let mut jobs = Vec::with_capacity(rows.len());
1259
1260    for row in rows {
1261        jobs.push((
1262            JobId(row.try_get(0)?),
1263            UNIX_EPOCH + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(1)?),
1264        ));
1265    }
1266
1267    Ok(jobs)
1268}
1269
1270async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1271    if jobs.is_empty() {
1272        return Ok(());
1273    }
1274
1275    let mut col_job_id = Vec::with_capacity(jobs.len());
1276    let mut col_execution_id = Vec::with_capacity(jobs.len());
1277
1278    for job in jobs {
1279        col_job_id.push(job.0);
1280        col_execution_id.push(Uuid::now_v7());
1281    }
1282
1283    let stmt = tx
1284        .prepare(
1285            r#"--sql
1286            INSERT INTO ora.execution (
1287                id,
1288                job_id
1289            ) SELECT * FROM UNNEST(
1290                $1::UUID[],
1291                $2::UUID[]
1292            )
1293            "#,
1294        )
1295        .await?;
1296
1297    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1298
1299    Ok(())
1300}
1301
1302async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[(JobId, SystemTime)]) -> Result<()> {
1303    if jobs.is_empty() {
1304        return Ok(());
1305    }
1306
1307    let mut col_job_id = Vec::with_capacity(jobs.len());
1308    let mut col_inactive_since = Vec::with_capacity(jobs.len());
1309
1310    for (job, inactive_since) in jobs {
1311        col_job_id.push(job.0);
1312        col_inactive_since.push(
1313            inactive_since
1314                .duration_since(UNIX_EPOCH)
1315                .unwrap_or_default()
1316                .as_secs_f64(),
1317        );
1318    }
1319
1320    {
1321        let stmt = tx
1322            .prepare(
1323                r#"--sql
1324                UPDATE ora.job
1325                SET inactive_since = to_timestamp(t.inactive_since)
1326                FROM
1327                    UNNEST(
1328                        $1::UUID[],
1329                        $2::DOUBLE PRECISION[]
1330                    ) AS t(job_id, inactive_since)
1331                WHERE
1332                    id = t.job_id;
1333                "#,
1334            )
1335            .await?;
1336
1337        tx.execute(&stmt, &[&col_job_id, &col_inactive_since])
1338            .await?;
1339    }
1340
1341    {
1342        let stmt = tx
1343            .prepare(
1344                r#"--sql
1345                UPDATE ora.schedule
1346                SET
1347                    active_job_id = NULL
1348                WHERE active_job_id = ANY($1::UUID[])
1349                "#,
1350            )
1351            .await?;
1352
1353        tx.execute(&stmt, &[&col_job_id]).await?;
1354    }
1355
1356    Ok(())
1357}