Skip to main content

ora_backend_postgres/
lib.rs

1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8    Backend,
9    common::{NextPageToken, TimeRange},
10    executions::{
11        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12        SucceededExecution,
13    },
14    executors::ExecutorId,
15    jobs::{CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId, NewJob},
16    schedules::{
17        PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters, ScheduleId,
18        ScheduleOrderBy, StoppedSchedule,
19    },
20};
21
22use refinery::embed_migrations;
23use thiserror::Error;
24use uuid::Uuid;
25
26use crate::{
27    db::{DbPool, DbTransaction},
28    query::{
29        jobs::{cancel_jobs, job_count, job_exists},
30        schedules::{schedule_count, schedule_exists},
31    },
32};
33
34embed_migrations!("./migrations");
35
36mod db;
37mod models;
38mod query;
39
40pub struct PostgresBackend {
41    pool: DbPool,
42}
43
44type Result<T> = core::result::Result<T, Error>;
45
46/// Errors that can occur when using the Postgres backend.
47#[derive(Error, Debug)]
48pub enum Error {
49    #[error("{0}")]
50    Migrations(#[from] refinery::Error),
51    #[error("{0}")]
52    Pool(#[from] PoolError),
53    #[error("{0}")]
54    Postgres(#[from] tokio_postgres::Error),
55    #[error("{0}")]
56    Serde(#[from] serde_json::Error),
57    #[error("invalid page token: {0}")]
58    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
59}
60
61impl PostgresBackend {
62    /// Create and initialize a new Postgres backend
63    /// with the given connection pool.
64    pub async fn new(pool: Pool) -> Result<Self> {
65        let mut conn = pool.get().await?;
66        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
67        migrations::runner()
68            .set_migration_table_name("ora.migrations")
69            .run_async(&mut **conn)
70            .await?;
71        Ok(Self { pool: DbPool(pool) })
72    }
73}
74
75impl Backend for PostgresBackend {
76    type Error = Error;
77
78    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
79        if job_types.is_empty() {
80            return Ok(());
81        }
82
83        let mut conn = self.pool.get().await?;
84
85        let tx = conn.transaction().await?;
86
87        let mut col_id = Vec::with_capacity(job_types.len());
88        let mut col_description = Vec::with_capacity(job_types.len());
89        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
90        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
91
92        for job_type in job_types {
93            col_id.push(job_type.id.as_str());
94            col_description.push(job_type.description.as_deref());
95            col_input_schema_json.push(job_type.input_schema_json.as_deref());
96            col_output_schema_json.push(job_type.output_schema_json.as_deref());
97        }
98
99        {
100            let stmt = tx
101                .prepare(
102                    r#"--sql
103                    INSERT INTO ora.job_type (
104                        id,
105                        description,
106                        input_schema_json,
107                        output_schema_json
108                    ) SELECT * FROM UNNEST(
109                        $1::TEXT[],
110                        $2::TEXT[],
111                        $3::TEXT[],
112                        $4::TEXT[]
113                    ) ON CONFLICT (id) DO UPDATE SET
114                        description = EXCLUDED.description,
115                        input_schema_json = EXCLUDED.input_schema_json,
116                        output_schema_json = EXCLUDED.output_schema_json
117                    WHERE
118                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
119                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
120                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
121                    "#,
122                )
123                .await?;
124
125            tx.execute(
126                &stmt,
127                &[
128                    &col_id,
129                    &col_description,
130                    &col_input_schema_json,
131                    &col_output_schema_json,
132                ],
133            )
134            .await?;
135        }
136
137        tx.commit().await?;
138
139        Ok(())
140    }
141
142    async fn list_job_types(&self) -> Result<Vec<JobType>> {
143        let mut conn = self.pool.get().await?;
144
145        let tx = conn.read_only_transaction().await?;
146
147        let job_types = {
148            let stmt = tx
149                .prepare(
150                    r#"--sql
151                    SELECT
152                        id,
153                        description,
154                        input_schema_json,
155                        output_schema_json
156                    FROM
157                        ora.job_type
158                    ORDER BY id
159                    "#,
160                )
161                .await?;
162
163            let rows = tx.query(&stmt, &[]).await?;
164
165            rows.into_iter()
166                .map(|row| {
167                    Result::<_>::Ok(JobType {
168                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
169                        description: row.try_get(1)?,
170                        input_schema_json: row.try_get(2)?,
171                        output_schema_json: row.try_get(3)?,
172                    })
173                })
174                .collect::<Result<Vec<_>>>()?
175        };
176
177        tx.commit().await?;
178
179        Ok(job_types)
180    }
181
182    async fn add_jobs(
183        &self,
184        jobs: &[NewJob],
185        if_not_exists: Option<JobFilters>,
186    ) -> Result<Vec<JobId>> {
187        if jobs.is_empty() {
188            return Ok(Vec::new());
189        }
190
191        let mut conn = self.pool.get().await?;
192
193        let tx = conn.transaction().await?;
194
195        if let Some(filters) = if_not_exists
196            && job_exists(&tx, filters).await?
197        {
198            tx.commit().await?;
199            return Ok(Vec::new());
200        }
201
202        let mut col_id = Vec::with_capacity(jobs.len());
203        let mut col_schedule_id = Vec::with_capacity(jobs.len());
204
205        {
206            let mut col_job_type_id = Vec::with_capacity(jobs.len());
207            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
208            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
209            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
210            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
211
212            for job in jobs {
213                col_id.push(Uuid::now_v7());
214                col_job_type_id.push(job.job.job_type_id.as_str());
215                col_target_execution_time.push(job.job.target_execution_time);
216                col_input_payload_json.push(job.job.input_payload_json.as_str());
217                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
218                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
219                col_schedule_id.push(job.schedule_id.map(|s| s.0));
220            }
221
222            let stmt = tx
223                .prepare(
224                    r#"--sql
225                    INSERT INTO ora.job (
226                        id,
227                        job_type_id,
228                        target_execution_time,
229                        input_payload_json,
230                        timeout_policy_json,
231                        retry_policy_json,
232                        schedule_id
233                    ) SELECT * FROM UNNEST(
234                        $1::UUID[],
235                        $2::TEXT[],
236                        $3::TIMESTAMPTZ[],
237                        $4::TEXT[],
238                        $5::TEXT[],
239                        $6::TEXT[],
240                        $7::UUID[]
241                    )
242                    "#,
243                )
244                .await?;
245
246            tx.execute(
247                &stmt,
248                &[
249                    &col_id,
250                    &col_job_type_id,
251                    &col_target_execution_time,
252                    &col_input_payload_json,
253                    &col_timeout_policy_json,
254                    &col_retry_policy_json,
255                    &col_schedule_id,
256                ],
257            )
258            .await?;
259        }
260
261        {
262            let mut col_job_id = Vec::with_capacity(jobs.len());
263            let mut col_job_label_key = Vec::with_capacity(jobs.len());
264            let mut col_job_label_value = Vec::with_capacity(jobs.len());
265
266            for (i, job) in jobs.iter().enumerate() {
267                for label in &job.job.labels {
268                    col_job_id.push(col_id[i]);
269                    col_job_label_key.push(label.key.as_str());
270                    col_job_label_value.push(label.value.as_str());
271                }
272            }
273
274            let stmt = tx
275                .prepare(
276                    r#"--sql
277                    INSERT INTO ora.job_label (
278                        job_id,
279                        label_key,
280                        label_value
281                    ) SELECT * FROM UNNEST(
282                        $1::UUID[],
283                        $2::TEXT[],
284                        $3::TEXT[]
285                    )
286                    "#,
287                )
288                .await?;
289
290            tx.execute(
291                &stmt,
292                &[&col_job_id, &col_job_label_key, &col_job_label_value],
293            )
294            .await?;
295        }
296
297        {
298            let stmt = tx
299                .prepare(
300                    r#"--sql
301                    UPDATE ora.schedule
302                    SET
303                        active_job_id = t.job_id
304                    FROM UNNEST(
305                        $1::UUID[],
306                        $2::UUID[]
307                    ) AS t(job_id, schedule_id)
308                    WHERE
309                        ora.schedule.id = t.schedule_id
310                    "#,
311                )
312                .await?;
313
314            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
315        }
316
317        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
318        add_executions(&tx, &job_ids).await?;
319
320        tx.commit().await?;
321
322        Ok(job_ids)
323    }
324
325    async fn list_jobs(
326        &self,
327        filters: JobFilters,
328        order_by: Option<JobOrderBy>,
329        page_size: u32,
330        page_token: Option<NextPageToken>,
331    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
332        let mut conn = self.pool.get().await?;
333        let tx = conn.read_only_transaction().await?;
334
335        let (jobs, next_page_token) = query::jobs::job_details(
336            &tx,
337            filters,
338            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
339            page_size,
340            page_token.map(|t| t.0),
341        )
342        .await?;
343
344        tx.commit().await?;
345
346        Ok((jobs, next_page_token))
347    }
348
349    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
350        let mut conn = self.pool.get().await?;
351        let tx = conn.read_only_transaction().await?;
352        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
353        tx.commit().await?;
354        Ok(count)
355    }
356
357    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
358        let mut conn = self.pool.get().await?;
359        let tx = conn.transaction().await?;
360        let jobs = cancel_jobs(&tx, filters).await?;
361
362        let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
363
364        mark_jobs_inactive(&tx, &job_ids).await?;
365
366        tx.commit().await?;
367
368        Ok(jobs)
369    }
370
371    async fn add_schedules(
372        &self,
373        schedules: &[ScheduleDefinition],
374        if_not_exists: Option<ScheduleFilters>,
375    ) -> Result<Vec<ScheduleId>> {
376        if schedules.is_empty() {
377            return Ok(Vec::new());
378        }
379
380        let mut conn = self.pool.get().await?;
381        let tx = conn.transaction().await?;
382
383        if let Some(filters) = if_not_exists
384            && schedule_exists(&tx, filters).await?
385        {
386            tx.commit().await?;
387            return Ok(Vec::new());
388        }
389
390        let mut col_id = Vec::with_capacity(schedules.len());
391        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
392        let mut col_job_template_json = Vec::with_capacity(schedules.len());
393        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
394        let mut col_start_after = Vec::with_capacity(schedules.len());
395        let mut col_end_before = Vec::with_capacity(schedules.len());
396
397        for schedule in schedules {
398            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
399
400            col_id.push(Uuid::now_v7());
401            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
402            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
403            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
404            col_start_after.push(start_after);
405            col_end_before.push(end_before);
406        }
407
408        {
409            let stmt = tx
410                .prepare(
411                    r#"--sql
412                    INSERT INTO ora.schedule (
413                        id,
414                        job_template_job_type_id,
415                        job_template_json,
416                        scheduling_policy_json,
417                        start_after,
418                        end_before
419                    ) SELECT * FROM UNNEST(
420                        $1::UUID[],
421                        $2::TEXT[],
422                        $3::TEXT[],
423                        $4::TEXT[],
424                        $5::TIMESTAMPTZ[],
425                        $6::TIMESTAMPTZ[]
426                    )
427                    "#,
428                )
429                .await?;
430
431            tx.execute(
432                &stmt,
433                &[
434                    &col_id,
435                    &col_job_template_job_type_id,
436                    &col_job_template_json,
437                    &col_scheduling_policy_json,
438                    &col_start_after,
439                    &col_end_before,
440                ],
441            )
442            .await?;
443        }
444
445        tx.commit().await?;
446
447        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
448
449        Ok(schedule_ids)
450    }
451
452    async fn list_schedules(
453        &self,
454        filters: ScheduleFilters,
455        order_by: Option<ScheduleOrderBy>,
456        page_size: u32,
457        page_token: Option<NextPageToken>,
458    ) -> Result<(
459        Vec<ScheduleDetails>,
460        Option<ora_backend::common::NextPageToken>,
461    )> {
462        let mut conn = self.pool.get().await?;
463        let tx = conn.read_only_transaction().await?;
464
465        let (schedules, next_page_token) = query::schedules::schedule_details(
466            &tx,
467            filters,
468            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
469            page_size,
470            page_token.map(|t| t.0),
471        )
472        .await?;
473
474        tx.commit().await?;
475
476        Ok((schedules, next_page_token))
477    }
478
479    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
480        let mut conn = self.pool.get().await?;
481        let tx = conn.read_only_transaction().await?;
482        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
483        tx.commit().await?;
484        Ok(count)
485    }
486
487    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
488        let mut conn = self.pool.get().await?;
489        let tx = conn.transaction().await?;
490        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
491        tx.commit().await?;
492        Ok(schedules)
493    }
494
495    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
496        async_stream::try_stream!({
497            let mut last_execution_id: Option<ExecutionId> = None;
498
499            loop {
500                let mut conn = self.pool.get().await?;
501                let tx = conn.read_only_transaction().await?;
502
503                let rows = if let Some(last_execution_id) = last_execution_id {
504                    let stmt = tx
505                        .prepare(
506                            r#"--sql
507                            SELECT
508                                ora.execution.id,
509                                ora.job.id,
510                                ora.job.job_type_id,
511                                ora.job.input_payload_json,
512                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
513                                ora.job.retry_policy_json,
514                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
515                            FROM
516                                ora.execution
517                            JOIN ora.job ON
518                                ora.execution.job_id = ora.job.id
519                            WHERE
520                                status = 0
521                                AND ora.job.target_execution_time <= NOW()
522                                AND ora.execution.id > $1::UUID
523                            ORDER BY
524                                ora.execution.id ASC
525                            LIMIT 1000
526                            "#,
527                        )
528                        .await?;
529
530                    tx.query(&stmt, &[&last_execution_id.0]).await?
531                } else {
532                    let stmt = tx
533                        .prepare(
534                            r#"--sql
535                            SELECT
536                                ora.execution.id,
537                                ora.job.id,
538                                ora.job.job_type_id,
539                                ora.job.input_payload_json,
540                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
541                                ora.job.retry_policy_json,
542                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
543                            FROM
544                                ora.execution
545                            JOIN ora.job ON
546                                ora.execution.job_id = ora.job.id
547                            WHERE
548                                status = 0
549                                AND ora.job.target_execution_time <= NOW()
550                            ORDER BY
551                                ora.execution.id ASC
552                            LIMIT 1000
553                            "#,
554                        )
555                        .await?;
556
557                    tx.query(&stmt, &[]).await?
558                };
559
560                tx.commit().await?;
561
562                if rows.is_empty() {
563                    break;
564                }
565
566                let mut ready_executions = Vec::with_capacity(rows.len());
567
568                for row in rows {
569                    ready_executions.push(ReadyExecution {
570                        execution_id: ExecutionId(row.try_get(0)?),
571                        job_id: JobId(row.try_get(1)?),
572                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
573                        input_payload_json: row.try_get(3)?,
574                        attempt_number: row.try_get::<_, i64>(4)? as u64,
575                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
576                        target_execution_time: UNIX_EPOCH
577                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
578                    });
579                }
580
581                last_execution_id = ready_executions.last().map(|e| e.execution_id);
582
583                yield ready_executions;
584            }
585        })
586    }
587
588    async fn wait_for_ready_executions(&self, ignore: &[ExecutionId]) -> crate::Result<()> {
589        let ignored_executions = ignore.iter().map(|id| id.0).collect::<Vec<_>>();
590
591        loop {
592            let mut conn = self.pool.get().await?;
593
594            let next_timestamp = {
595                let tx = conn.read_only_transaction().await?;
596
597                let stmt = tx
598                    .prepare(
599                        r#"--sql
600                        SELECT
601                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
602                        FROM
603                            ora.job
604                        JOIN ora.execution ON
605                            ora.execution.job_id = ora.job.id
606                        WHERE
607                            ora.execution.status = 0
608                            AND NOT (ora.execution.id = ANY($1::UUID[]))
609                        ORDER BY
610                            ora.job.target_execution_time ASC
611                        LIMIT 1
612                        "#,
613                    )
614                    .await?;
615
616                let row = tx.query_opt(&stmt, &[&ignored_executions]).await?;
617
618                tx.commit().await?;
619
620                match row {
621                    Some(row) => row
622                        .try_get::<_, Option<f64>>(0)?
623                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
624                    None => None,
625                }
626            };
627
628            drop(conn);
629
630            if let Some(next_timestamp) = next_timestamp {
631                let now = std::time::SystemTime::now();
632
633                if next_timestamp <= now {
634                    break;
635                }
636
637                tokio::time::sleep(
638                    next_timestamp
639                        .duration_since(now)
640                        .unwrap_or_else(|_| Duration::from_secs(0)),
641                )
642                .await;
643                break;
644            }
645
646            tokio::time::sleep(Duration::from_millis(500)).await;
647        }
648
649        Ok(())
650    }
651
652    fn in_progress_executions(
653        &self,
654    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
655        async_stream::try_stream!({
656            let mut last_execution_id: Option<ExecutionId> = None;
657
658            loop {
659                let mut conn = self.pool.get().await?;
660                let tx = conn.read_only_transaction().await?;
661
662                let rows = if let Some(last_execution_id) = last_execution_id {
663                    let stmt = tx
664                        .prepare(
665                            r#"--sql
666                            SELECT
667                                ora.execution.id,
668                                ora.job.id,
669                                ora.execution.executor_id,
670                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
671                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
672                                ora.job.timeout_policy_json,
673                                ora.job.retry_policy_json,
674                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
675                            FROM
676                                ora.execution
677                            JOIN ora.job ON
678                                ora.execution.job_id = ora.job.id
679                            WHERE
680                                status = 1
681                                AND ora.execution.id > $1::UUID
682                            ORDER BY
683                                ora.execution.id ASC
684                            LIMIT 1000
685                            "#,
686                        )
687                        .await?;
688
689                    tx.query(&stmt, &[&last_execution_id.0]).await?
690                } else {
691                    let stmt = tx
692                        .prepare(
693                            r#"--sql
694                            SELECT
695                                ora.execution.id,
696                                ora.job.id,
697                                ora.execution.executor_id,
698                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
699                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
700                                ora.job.timeout_policy_json,
701                                ora.job.retry_policy_json,
702                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
703                            FROM
704                                ora.execution
705                            JOIN ora.job ON
706                                ora.execution.job_id = ora.job.id
707                            WHERE
708                                status = 1
709                            ORDER BY
710                                ora.execution.id ASC
711                            LIMIT 1000
712                        "#,
713                        )
714                        .await?;
715
716                    tx.query(&stmt, &[]).await?
717                };
718
719                tx.commit().await?;
720
721                if rows.is_empty() {
722                    break;
723                }
724
725                let mut in_progress_executions = Vec::with_capacity(rows.len());
726
727                for row in rows {
728                    in_progress_executions.push(InProgressExecution {
729                        execution_id: ExecutionId(row.try_get(0)?),
730                        job_id: JobId(row.try_get(1)?),
731                        executor_id: ExecutorId(row.try_get(2)?),
732                        target_execution_time: UNIX_EPOCH
733                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
734                        started_at: UNIX_EPOCH
735                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
736                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
737                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
738                        attempt_number: row.try_get::<_, i64>(7)? as u64,
739                    });
740                }
741
742                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
743
744                yield in_progress_executions;
745            }
746        })
747    }
748
749    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
750        if executions.is_empty() {
751            return Ok(());
752        }
753
754        let mut conn = self.pool.get().await?;
755
756        let tx = conn.transaction().await?;
757
758        let stmt = tx
759            .prepare(
760                r#"--sql
761                UPDATE ora.execution
762                SET
763                    executor_id = t.executor_id,
764                    started_at = to_timestamp(t.started_at)
765                FROM UNNEST(
766                    $1::UUID[],
767                    $2::UUID[],
768                    $3::DOUBLE PRECISION[]
769                ) AS t(execution_id, executor_id, started_at)
770                WHERE
771                    execution_id = id
772                    AND ora.execution.started_at IS NULL
773                "#,
774            )
775            .await?;
776
777        let mut col_execution_id = Vec::with_capacity(executions.len());
778        let mut col_executor_id = Vec::with_capacity(executions.len());
779        let mut col_started_at = Vec::with_capacity(executions.len());
780
781        for execution in executions {
782            col_execution_id.push(execution.execution_id.0);
783            col_executor_id.push(execution.executor_id.0);
784            col_started_at.push(
785                execution
786                    .started_at
787                    .duration_since(UNIX_EPOCH)
788                    .unwrap_or_default()
789                    .as_secs_f64(),
790            );
791        }
792
793        tx.execute(
794            &stmt,
795            &[&col_execution_id, &col_executor_id, &col_started_at],
796        )
797        .await?;
798
799        tx.commit().await?;
800
801        Ok(())
802    }
803
804    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
805        if executions.is_empty() {
806            return Ok(());
807        }
808
809        let mut conn = self.pool.get().await?;
810
811        let tx = conn.transaction().await?;
812
813        let stmt = tx
814            .prepare(
815                r#"--sql
816                UPDATE ora.execution
817                SET
818                    succeeded_at = to_timestamp(t.succeeded_at),
819                    output_json = t.output_json
820                FROM UNNEST(
821                    $1::UUID[],
822                    $2::DOUBLE PRECISION[],
823                    $3::TEXT[]
824                ) AS t(execution_id, succeeded_at, output_json)
825                WHERE
826                    execution_id = id
827                    AND status < 2
828                RETURNING job_id
829                "#,
830            )
831            .await?;
832
833        let mut col_execution_id = Vec::with_capacity(executions.len());
834        let mut col_succeeded_at = Vec::with_capacity(executions.len());
835        let mut col_output_json = Vec::with_capacity(executions.len());
836
837        for execution in executions {
838            col_execution_id.push(execution.execution_id.0);
839            col_succeeded_at.push(
840                execution
841                    .succeeded_at
842                    .duration_since(UNIX_EPOCH)
843                    .unwrap_or_default()
844                    .as_secs_f64(),
845            );
846            col_output_json.push(execution.output_json.as_str());
847        }
848
849        let rows = tx
850            .query(
851                &stmt,
852                &[&col_execution_id, &col_succeeded_at, &col_output_json],
853            )
854            .await?;
855
856        let mut job_ids = Vec::with_capacity(rows.len());
857        for row in rows {
858            job_ids.push(JobId(row.try_get(0)?));
859        }
860
861        mark_jobs_inactive(&tx, &job_ids).await?;
862
863        tx.commit().await?;
864
865        Ok(())
866    }
867
868    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
869        if executions.is_empty() {
870            return Ok(());
871        }
872
873        let mut conn = self.pool.get().await?;
874        let tx = conn.transaction().await?;
875
876        let jobs = executions_failed(&tx, executions).await?;
877
878        mark_jobs_inactive(&tx, &jobs).await?;
879
880        tx.commit().await?;
881
882        Ok(())
883    }
884
885    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
886        if executions.is_empty() {
887            return Ok(());
888        }
889
890        let mut conn = self.pool.get().await?;
891        let tx = conn.transaction().await?;
892
893        let job_ids = executions_failed(&tx, executions).await?;
894        add_executions(&tx, &job_ids).await?;
895
896        tx.commit().await?;
897
898        Ok(())
899    }
900
901    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
902        async_stream::try_stream!({
903            let mut last_schedule_id: Option<ScheduleId> = None;
904            loop {
905                let mut conn = self.pool.get().await?;
906                let tx = conn.read_only_transaction().await?;
907
908                let rows = if let Some(last_schedule_id) = last_schedule_id {
909                    let stmt = tx
910                        .prepare(
911                            r#"--sql
912                            SELECT
913                                id,
914                                EXTRACT(EPOCH FROM(
915                                    SELECT
916                                        target_execution_time
917                                    FROM
918                                        ora.job
919                                    WHERE
920                                        ora.job.schedule_id = ora.schedule.id
921                                    ORDER BY ora.job.id DESC
922                                    LIMIT 1
923                                ))::DOUBLE PRECISION,
924                                scheduling_policy_json,
925                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
926                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
927                                job_template_json
928                            FROM
929                                ora.schedule
930                            WHERE
931                                (
932                                    ora.schedule.stopped_at IS NULL
933                                    AND ora.schedule.active_job_id IS NULL
934                                )
935                                AND id > $1::UUID
936                            ORDER BY id ASC
937                            LIMIT 1000
938                            "#,
939                        )
940                        .await?;
941
942                    tx.query(&stmt, &[&last_schedule_id.0]).await?
943                } else {
944                    let stmt = tx
945                        .prepare(
946                            r#"--sql
947                            SELECT
948                                id,
949                                EXTRACT(EPOCH FROM(
950                                    SELECT
951                                        target_execution_time
952                                    FROM
953                                        ora.job
954                                    WHERE
955                                        ora.job.schedule_id = ora.schedule.id
956                                    ORDER BY ora.job.id DESC
957                                    LIMIT 1
958                                ))::DOUBLE PRECISION,
959                                scheduling_policy_json,
960                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
961                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
962                                job_template_json
963                            FROM
964                                ora.schedule
965                            WHERE
966                                ora.schedule.stopped_at IS NULL
967                                AND ora.schedule.active_job_id IS NULL
968                            ORDER BY id ASC
969                            LIMIT 1000
970                            "#,
971                        )
972                        .await?;
973
974                    tx.query(&stmt, &[]).await?
975                };
976
977                tx.commit().await?;
978
979                if rows.is_empty() {
980                    break;
981                }
982
983                let mut schedules = Vec::with_capacity(rows.len());
984
985                for row in rows {
986                    schedules.push(PendingSchedule {
987                        schedule_id: ScheduleId(row.try_get(0)?),
988                        last_target_execution_time: row
989                            .try_get::<_, Option<f64>>(1)?
990                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
991                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
992                        time_range: TimeRange {
993                            start: row
994                                .try_get::<_, Option<f64>>(3)?
995                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
996                            end: row
997                                .try_get::<_, Option<f64>>(4)?
998                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
999                        },
1000                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
1001                    });
1002                }
1003
1004                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1005                yield schedules;
1006            }
1007        })
1008    }
1009
1010    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1011        let mut conn = self.pool.get().await?;
1012
1013        // delete in a loop so that we don't lock
1014        // tables for too long
1015        loop {
1016            let mut deleted_rows = 0;
1017
1018            {
1019                let tx = conn.transaction().await?;
1020
1021                let stmt = tx
1022                    .prepare(
1023                        r#"--sql
1024                        DELETE FROM ora.job
1025                        WHERE id IN (
1026                            SELECT ora.job.id
1027                            FROM ora.job
1028                            JOIN LATERAL (
1029                                SELECT
1030                                    status,
1031                                    succeeded_at,
1032                                    failed_at,
1033                                    cancelled_at
1034                                FROM
1035                                    ora.execution
1036                                WHERE
1037                                    ora.execution.job_id = ora.job.id
1038                                ORDER BY ora.execution.id DESC
1039                                LIMIT 1
1040                            ) e ON TRUE
1041                            WHERE
1042                                ora.job.inactive
1043                                AND (
1044                                    e.succeeded_at < to_timestamp($1)
1045                                    OR e.failed_at < to_timestamp($1)
1046                                    OR e.cancelled_at < to_timestamp($1)
1047                                )
1048                            LIMIT 25
1049                            FOR UPDATE SKIP LOCKED
1050                        );
1051                        "#,
1052                    )
1053                    .await?;
1054
1055                let deleted = tx
1056                    .execute(
1057                        &stmt,
1058                        &[&before
1059                            .duration_since(UNIX_EPOCH)
1060                            .unwrap_or_default()
1061                            .as_secs_f64()],
1062                    )
1063                    .await?;
1064
1065                deleted_rows += deleted;
1066
1067                tx.commit().await?;
1068            }
1069
1070            {
1071                let tx = conn.transaction().await?;
1072                let stmt = tx
1073                    .prepare(
1074                        r#"--sql
1075                        DELETE FROM ONLY ora.schedule
1076                        WHERE ctid IN (
1077                            SELECT ctid
1078                            FROM ora.schedule
1079                            WHERE 
1080                                ora.schedule.stopped_at < to_timestamp($1)
1081                            LIMIT 25
1082                            FOR UPDATE SKIP LOCKED
1083                        )
1084                        "#,
1085                    )
1086                    .await?;
1087
1088                let deleted = tx
1089                    .execute(
1090                        &stmt,
1091                        &[&before
1092                            .duration_since(UNIX_EPOCH)
1093                            .unwrap_or_default()
1094                            .as_secs_f64()],
1095                    )
1096                    .await?;
1097
1098                tx.commit().await?;
1099
1100                deleted_rows += deleted;
1101            }
1102
1103            {
1104                let tx = conn.transaction().await?;
1105
1106                let stmt = tx
1107                    .prepare(
1108                        r#"--sql
1109                        DELETE FROM ora.job_type
1110                        WHERE
1111                            NOT EXISTS (
1112                                SELECT FROM ora.job
1113                                WHERE
1114                                    ora.job.job_type_id = ora.job_type.id
1115                            )
1116                            AND NOT EXISTS (
1117                                SELECT FROM ora.schedule
1118                                WHERE
1119                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1120                            )
1121                        "#,
1122                    )
1123                    .await?;
1124
1125                tx.execute(&stmt, &[]).await?;
1126
1127                tx.commit().await?;
1128            }
1129
1130            if deleted_rows == 0 {
1131                break;
1132            }
1133        }
1134
1135        Ok(())
1136    }
1137
1138    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1139        loop {
1140            let mut conn = self.pool.get().await?;
1141
1142            let has_pending = {
1143                let tx = conn.read_only_transaction().await?;
1144
1145                let stmt = tx
1146                    .prepare(
1147                        r#"--sql
1148                        SELECT 1 FROM ora.schedule
1149                        WHERE
1150                            ora.schedule.stopped_at IS NULL
1151                            AND ora.schedule.active_job_id IS NULL
1152                        LIMIT 1
1153                        "#,
1154                    )
1155                    .await?;
1156
1157                let row = tx.query_opt(&stmt, &[]).await?;
1158
1159                tx.commit().await?;
1160
1161                row.is_some()
1162            };
1163
1164            drop(conn);
1165
1166            if has_pending {
1167                break;
1168            }
1169
1170            tokio::time::sleep(Duration::from_millis(500)).await;
1171        }
1172
1173        Ok(())
1174    }
1175}
1176
1177async fn executions_failed(
1178    tx: &DbTransaction<'_>,
1179    executions: &[FailedExecution],
1180) -> Result<Vec<JobId>> {
1181    let stmt = tx
1182        .prepare(
1183            r#"--sql
1184            UPDATE ora.execution
1185            SET
1186                failed_at = to_timestamp(t.failed_at),
1187                failure_reason = t.failure_reason
1188            FROM UNNEST(
1189                $1::UUID[],
1190                $2::DOUBLE PRECISION[],
1191                $3::TEXT[]
1192            ) AS t(execution_id, failed_at, failure_reason)
1193            WHERE
1194                execution_id = id
1195                AND status < 2
1196            RETURNING job_id;
1197            "#,
1198        )
1199        .await?;
1200
1201    let mut col_execution_id = Vec::with_capacity(executions.len());
1202    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1203    let mut col_failure_reason = Vec::with_capacity(executions.len());
1204
1205    for execution in executions {
1206        col_execution_id.push(execution.execution_id.0);
1207        col_succeeded_at.push(
1208            execution
1209                .failed_at
1210                .duration_since(UNIX_EPOCH)
1211                .unwrap_or_default()
1212                .as_secs_f64(),
1213        );
1214        col_failure_reason.push(execution.failure_reason.as_str());
1215    }
1216
1217    let rows = tx
1218        .query(
1219            &stmt,
1220            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1221        )
1222        .await?;
1223
1224    let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1225
1226    Ok(job_ids)
1227}
1228
1229async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1230    if jobs.is_empty() {
1231        return Ok(());
1232    }
1233
1234    let mut col_job_id = Vec::with_capacity(jobs.len());
1235    let mut col_execution_id = Vec::with_capacity(jobs.len());
1236
1237    for job in jobs {
1238        col_job_id.push(job.0);
1239        col_execution_id.push(Uuid::now_v7());
1240    }
1241
1242    let stmt = tx
1243        .prepare(
1244            r#"--sql
1245            INSERT INTO ora.execution (
1246                id,
1247                job_id
1248            ) SELECT * FROM UNNEST(
1249                $1::UUID[],
1250                $2::UUID[]
1251            )
1252            "#,
1253        )
1254        .await?;
1255
1256    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1257
1258    Ok(())
1259}
1260
1261async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1262    if jobs.is_empty() {
1263        return Ok(());
1264    }
1265
1266    let mut col_job_id = Vec::with_capacity(jobs.len());
1267
1268    for job in jobs {
1269        col_job_id.push(job.0);
1270    }
1271
1272    {
1273        let stmt = tx
1274            .prepare(
1275                r#"--sql
1276                UPDATE ora.job
1277                SET inactive = TRUE
1278                WHERE id = ANY($1::UUID[])
1279                "#,
1280            )
1281            .await?;
1282
1283        tx.execute(&stmt, &[&col_job_id]).await?;
1284    }
1285
1286    {
1287        let stmt = tx
1288            .prepare(
1289                r#"--sql
1290                UPDATE ora.schedule
1291                SET
1292                    active_job_id = NULL
1293                WHERE active_job_id = ANY($1::UUID[])
1294                "#,
1295            )
1296            .await?;
1297
1298        tx.execute(&stmt, &[&col_job_id]).await?;
1299    }
1300
1301    Ok(())
1302}