Skip to main content

ora_backend_postgres/
lib.rs

1#![allow(missing_docs)]
2
3use std::time::{Duration, UNIX_EPOCH};
4
5use deadpool_postgres::{Pool, PoolError};
6use futures::Stream;
7use ora_backend::{
8    Backend,
9    common::{NextPageToken, TimeRange},
10    executions::{
11        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
12        SucceededExecution,
13    },
14    executors::ExecutorId,
15    jobs::{CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, JobTypeId, NewJob},
16    schedules::{
17        PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters, ScheduleId,
18        ScheduleOrderBy, StoppedSchedule,
19    },
20};
21
22use refinery::embed_migrations;
23use thiserror::Error;
24use uuid::Uuid;
25
26use crate::{
27    db::{DbPool, DbTransaction},
28    query::{
29        jobs::{cancel_jobs, job_count, job_exists},
30        schedules::{schedule_count, schedule_exists},
31    },
32};
33
34embed_migrations!("./migrations");
35
36mod db;
37mod models;
38mod query;
39
40pub struct PostgresBackend {
41    pool: DbPool,
42}
43
44type Result<T> = core::result::Result<T, Error>;
45
46/// Errors that can occur when using the Postgres backend.
47#[derive(Error, Debug)]
48pub enum Error {
49    #[error("{0}")]
50    Migrations(#[from] refinery::Error),
51    #[error("{0}")]
52    Pool(#[from] PoolError),
53    #[error("{0}")]
54    Postgres(#[from] tokio_postgres::Error),
55    #[error("{0}")]
56    Serde(#[from] serde_json::Error),
57    #[error("invalid page token: {0}")]
58    InvalidPageToken(Box<dyn std::error::Error + Send + Sync>),
59}
60
61impl PostgresBackend {
62    /// Create and initialize a new Postgres backend
63    /// with the given connection pool.
64    pub async fn new(pool: Pool) -> Result<Self> {
65        let mut conn = pool.get().await?;
66        conn.execute("CREATE SCHEMA IF NOT EXISTS ora", &[]).await?;
67        migrations::runner()
68            .set_migration_table_name("ora.migrations")
69            .run_async(&mut **conn)
70            .await?;
71        Ok(Self { pool: DbPool(pool) })
72    }
73}
74
75impl Backend for PostgresBackend {
76    type Error = Error;
77
78    async fn add_job_types(&self, job_types: &[JobType]) -> Result<()> {
79        if job_types.is_empty() {
80            return Ok(());
81        }
82
83        let mut conn = self.pool.get().await?;
84
85        let tx = conn.transaction().await?;
86
87        let mut col_id = Vec::with_capacity(job_types.len());
88        let mut col_description = Vec::with_capacity(job_types.len());
89        let mut col_input_schema_json = Vec::with_capacity(job_types.len());
90        let mut col_output_schema_json = Vec::with_capacity(job_types.len());
91
92        for job_type in job_types {
93            col_id.push(job_type.id.as_str());
94            col_description.push(job_type.description.as_deref());
95            col_input_schema_json.push(job_type.input_schema_json.as_deref());
96            col_output_schema_json.push(job_type.output_schema_json.as_deref());
97        }
98
99        {
100            let stmt = tx
101                .prepare(
102                    r#"--sql
103                    INSERT INTO ora.job_type (
104                        id,
105                        description,
106                        input_schema_json,
107                        output_schema_json
108                    ) SELECT * FROM UNNEST(
109                        $1::TEXT[],
110                        $2::TEXT[],
111                        $3::TEXT[],
112                        $4::TEXT[]
113                    ) ON CONFLICT (id) DO UPDATE SET
114                        description = EXCLUDED.description,
115                        input_schema_json = EXCLUDED.input_schema_json,
116                        output_schema_json = EXCLUDED.output_schema_json
117                    WHERE
118                        ora.job_type.description IS DISTINCT FROM EXCLUDED.description
119                        OR ora.job_type.input_schema_json IS DISTINCT FROM EXCLUDED.input_schema_json
120                        OR ora.job_type.output_schema_json IS DISTINCT FROM EXCLUDED.output_schema_json
121                    "#,
122                )
123                .await?;
124
125            tx.execute(
126                &stmt,
127                &[
128                    &col_id,
129                    &col_description,
130                    &col_input_schema_json,
131                    &col_output_schema_json,
132                ],
133            )
134            .await?;
135        }
136
137        tx.commit().await?;
138
139        Ok(())
140    }
141
142    async fn list_job_types(&self) -> Result<Vec<JobType>> {
143        let mut conn = self.pool.get().await?;
144
145        let tx = conn.read_only_transaction().await?;
146
147        let job_types = {
148            let stmt = tx
149                .prepare(
150                    r#"--sql
151                    SELECT
152                        id,
153                        description,
154                        input_schema_json,
155                        output_schema_json
156                    FROM
157                        ora.job_type
158                    ORDER BY id
159                    "#,
160                )
161                .await?;
162
163            let rows = tx.query(&stmt, &[]).await?;
164
165            rows.into_iter()
166                .map(|row| {
167                    Result::<_>::Ok(JobType {
168                        id: JobTypeId::new_unchecked(row.try_get::<_, String>(0)?),
169                        description: row.try_get(1)?,
170                        input_schema_json: row.try_get(2)?,
171                        output_schema_json: row.try_get(3)?,
172                    })
173                })
174                .collect::<Result<Vec<_>>>()?
175        };
176
177        tx.commit().await?;
178
179        Ok(job_types)
180    }
181
182    async fn add_jobs(
183        &self,
184        jobs: &[NewJob],
185        if_not_exists: Option<JobFilters>,
186    ) -> Result<Vec<JobId>> {
187        if jobs.is_empty() {
188            return Ok(Vec::new());
189        }
190
191        let mut conn = self.pool.get().await?;
192
193        let tx = conn.transaction().await?;
194
195        if let Some(filters) = if_not_exists
196            && job_exists(&tx, filters).await?
197        {
198            tx.commit().await?;
199            return Ok(Vec::new());
200        }
201
202        let mut col_id = Vec::with_capacity(jobs.len());
203        let mut col_schedule_id = Vec::with_capacity(jobs.len());
204
205        {
206            let mut col_job_type_id = Vec::with_capacity(jobs.len());
207            let mut col_target_execution_time = Vec::with_capacity(jobs.len());
208            let mut col_input_payload_json = Vec::with_capacity(jobs.len());
209            let mut col_timeout_policy_json = Vec::with_capacity(jobs.len());
210            let mut col_retry_policy_json = Vec::with_capacity(jobs.len());
211
212            for job in jobs {
213                col_id.push(Uuid::now_v7());
214                col_job_type_id.push(job.job.job_type_id.as_str());
215                col_target_execution_time.push(job.job.target_execution_time);
216                col_input_payload_json.push(job.job.input_payload_json.as_str());
217                col_timeout_policy_json.push(serde_json::to_string(&job.job.timeout_policy)?);
218                col_retry_policy_json.push(serde_json::to_string(&job.job.retry_policy)?);
219                col_schedule_id.push(job.schedule_id.map(|s| s.0));
220            }
221
222            let stmt = tx
223                .prepare(
224                    r#"--sql
225                    INSERT INTO ora.job (
226                        id,
227                        job_type_id,
228                        target_execution_time,
229                        input_payload_json,
230                        timeout_policy_json,
231                        retry_policy_json,
232                        schedule_id
233                    ) SELECT * FROM UNNEST(
234                        $1::UUID[],
235                        $2::TEXT[],
236                        $3::TIMESTAMPTZ[],
237                        $4::TEXT[],
238                        $5::TEXT[],
239                        $6::TEXT[],
240                        $7::UUID[]
241                    )
242                    "#,
243                )
244                .await?;
245
246            tx.execute(
247                &stmt,
248                &[
249                    &col_id,
250                    &col_job_type_id,
251                    &col_target_execution_time,
252                    &col_input_payload_json,
253                    &col_timeout_policy_json,
254                    &col_retry_policy_json,
255                    &col_schedule_id,
256                ],
257            )
258            .await?;
259        }
260
261        {
262            let mut col_job_id = Vec::with_capacity(jobs.len());
263            let mut col_job_label_key = Vec::with_capacity(jobs.len());
264            let mut col_job_label_value = Vec::with_capacity(jobs.len());
265
266            for (i, job) in jobs.iter().enumerate() {
267                for label in &job.job.labels {
268                    col_job_id.push(col_id[i]);
269                    col_job_label_key.push(label.key.as_str());
270                    col_job_label_value.push(label.value.as_str());
271                }
272            }
273
274            let stmt = tx
275                .prepare(
276                    r#"--sql
277                    INSERT INTO ora.job_label (
278                        job_id,
279                        label_key,
280                        label_value
281                    ) SELECT * FROM UNNEST(
282                        $1::UUID[],
283                        $2::TEXT[],
284                        $3::TEXT[]
285                    )
286                    "#,
287                )
288                .await?;
289
290            tx.execute(
291                &stmt,
292                &[&col_job_id, &col_job_label_key, &col_job_label_value],
293            )
294            .await?;
295        }
296
297        {
298            let stmt = tx
299                .prepare(
300                    r#"--sql
301                    UPDATE ora.schedule
302                    SET
303                        active_job_id = t.job_id
304                    FROM UNNEST(
305                        $1::UUID[],
306                        $2::UUID[]
307                    ) AS t(job_id, schedule_id)
308                    WHERE
309                        ora.schedule.id = t.schedule_id
310                    "#,
311                )
312                .await?;
313
314            tx.execute(&stmt, &[&col_id, &col_schedule_id]).await?;
315        }
316
317        let job_ids = col_id.into_iter().map(Into::into).collect::<Vec<_>>();
318        add_executions(&tx, &job_ids).await?;
319
320        tx.commit().await?;
321
322        Ok(job_ids)
323    }
324
325    async fn list_jobs(
326        &self,
327        filters: JobFilters,
328        order_by: Option<JobOrderBy>,
329        page_size: u32,
330        page_token: Option<NextPageToken>,
331    ) -> Result<(Vec<JobDetails>, Option<ora_backend::common::NextPageToken>)> {
332        let mut conn = self.pool.get().await?;
333        let tx = conn.read_only_transaction().await?;
334
335        let (jobs, next_page_token) = query::jobs::job_details(
336            &tx,
337            filters,
338            order_by.unwrap_or(JobOrderBy::CreatedAtAsc),
339            page_size,
340            page_token.map(|t| t.0),
341        )
342        .await?;
343
344        tx.commit().await?;
345
346        Ok((jobs, next_page_token))
347    }
348
349    async fn count_jobs(&self, filters: JobFilters) -> Result<u64> {
350        let mut conn = self.pool.get().await?;
351        let tx = conn.read_only_transaction().await?;
352        let count = u64::try_from(job_count(&tx, filters).await?).unwrap_or_default();
353        tx.commit().await?;
354        Ok(count)
355    }
356
357    async fn cancel_jobs(&self, filters: JobFilters) -> Result<Vec<CancelledJob>> {
358        let mut conn = self.pool.get().await?;
359        let tx = conn.transaction().await?;
360        let jobs = cancel_jobs(&tx, filters).await?;
361
362        let job_ids = jobs.iter().map(|j| j.job_id).collect::<Vec<_>>();
363
364        mark_jobs_inactive(&tx, &job_ids).await?;
365
366        tx.commit().await?;
367
368        Ok(jobs)
369    }
370
371    async fn add_schedules(
372        &self,
373        schedules: &[ScheduleDefinition],
374        if_not_exists: Option<ScheduleFilters>,
375    ) -> Result<Vec<ScheduleId>> {
376        if schedules.is_empty() {
377            return Ok(Vec::new());
378        }
379
380        let mut conn = self.pool.get().await?;
381        let tx = conn.transaction().await?;
382
383        if let Some(filters) = if_not_exists
384            && schedule_exists(&tx, filters).await?
385        {
386            tx.commit().await?;
387            return Ok(Vec::new());
388        }
389
390        let mut col_id = Vec::with_capacity(schedules.len());
391        let mut col_job_template_job_type_id = Vec::with_capacity(schedules.len());
392        let mut col_job_template_json = Vec::with_capacity(schedules.len());
393        let mut col_scheduling_policy_json = Vec::with_capacity(schedules.len());
394        let mut col_start_after = Vec::with_capacity(schedules.len());
395        let mut col_end_before = Vec::with_capacity(schedules.len());
396
397        for schedule in schedules {
398            let (start_after, end_before) = (schedule.time_range.start, schedule.time_range.end);
399
400            col_id.push(Uuid::now_v7());
401            col_job_template_job_type_id.push(schedule.job_template.job_type_id.as_str());
402            col_job_template_json.push(serde_json::to_string(&schedule.job_template)?);
403            col_scheduling_policy_json.push(serde_json::to_string(&schedule.scheduling)?);
404            col_start_after.push(start_after);
405            col_end_before.push(end_before);
406        }
407
408        {
409            let stmt = tx
410                .prepare(
411                    r#"--sql
412                    INSERT INTO ora.schedule (
413                        id,
414                        job_template_job_type_id,
415                        job_template_json,
416                        scheduling_policy_json,
417                        start_after,
418                        end_before
419                    ) SELECT * FROM UNNEST(
420                        $1::UUID[],
421                        $2::TEXT[],
422                        $3::TEXT[],
423                        $4::TEXT[],
424                        $5::TIMESTAMPTZ[],
425                        $6::TIMESTAMPTZ[]
426                    )
427                    "#,
428                )
429                .await?;
430
431            tx.execute(
432                &stmt,
433                &[
434                    &col_id,
435                    &col_job_template_job_type_id,
436                    &col_job_template_json,
437                    &col_scheduling_policy_json,
438                    &col_start_after,
439                    &col_end_before,
440                ],
441            )
442            .await?;
443        }
444
445        tx.commit().await?;
446
447        let schedule_ids = col_id.into_iter().map(ScheduleId).collect::<Vec<_>>();
448
449        Ok(schedule_ids)
450    }
451
452    async fn list_schedules(
453        &self,
454        filters: ScheduleFilters,
455        order_by: Option<ScheduleOrderBy>,
456        page_size: u32,
457        page_token: Option<NextPageToken>,
458    ) -> Result<(
459        Vec<ScheduleDetails>,
460        Option<ora_backend::common::NextPageToken>,
461    )> {
462        let mut conn = self.pool.get().await?;
463        let tx = conn.read_only_transaction().await?;
464
465        let (schedules, next_page_token) = query::schedules::schedule_details(
466            &tx,
467            filters,
468            order_by.unwrap_or(ScheduleOrderBy::CreatedAtAsc),
469            page_size,
470            page_token.map(|t| t.0),
471        )
472        .await?;
473
474        tx.commit().await?;
475
476        Ok((schedules, next_page_token))
477    }
478
479    async fn count_schedules(&self, filters: ScheduleFilters) -> Result<u64> {
480        let mut conn = self.pool.get().await?;
481        let tx = conn.read_only_transaction().await?;
482        let count = u64::try_from(schedule_count(&tx, filters).await?).unwrap_or_default();
483        tx.commit().await?;
484        Ok(count)
485    }
486
487    async fn stop_schedules(&self, filters: ScheduleFilters) -> Result<Vec<StoppedSchedule>> {
488        let mut conn = self.pool.get().await?;
489        let tx = conn.transaction().await?;
490        let schedules = query::schedules::stop_schedules(&tx, filters).await?;
491        tx.commit().await?;
492        Ok(schedules)
493    }
494
495    fn ready_executions(&self) -> impl Stream<Item = Result<Vec<ReadyExecution>>> + Send {
496        async_stream::try_stream!({
497            let mut last_execution_id: Option<ExecutionId> = None;
498
499            loop {
500                let mut conn = self.pool.get().await?;
501                let tx = conn.read_only_transaction().await?;
502
503                let rows = if let Some(last_execution_id) = last_execution_id {
504                    let stmt = tx
505                        .prepare(
506                            r#"--sql
507                            SELECT
508                                ora.execution.id,
509                                ora.job.id,
510                                ora.job.job_type_id,
511                                ora.job.input_payload_json,
512                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
513                                ora.job.retry_policy_json,
514                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
515                            FROM
516                                ora.execution
517                            JOIN ora.job ON
518                                ora.execution.job_id = ora.job.id
519                            WHERE
520                                status = 0
521                                AND ora.job.target_execution_time <= NOW()
522                                AND ora.execution.id > $1::UUID
523                            ORDER BY
524                                ora.execution.id ASC
525                            LIMIT 1000
526                            "#,
527                        )
528                        .await?;
529
530                    tx.query(&stmt, &[&last_execution_id.0]).await?
531                } else {
532                    let stmt = tx
533                        .prepare(
534                            r#"--sql
535                            SELECT
536                                ora.execution.id,
537                                ora.job.id,
538                                ora.job.job_type_id,
539                                ora.job.input_payload_json,
540                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id),
541                                ora.job.retry_policy_json,
542                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION
543                            FROM
544                                ora.execution
545                            JOIN ora.job ON
546                                ora.execution.job_id = ora.job.id
547                            WHERE
548                                status = 0
549                                AND ora.job.target_execution_time <= NOW()
550                            ORDER BY
551                                ora.execution.id ASC
552                            LIMIT 1000
553                            "#,
554                        )
555                        .await?;
556
557                    tx.query(&stmt, &[]).await?
558                };
559
560                tx.commit().await?;
561
562                if rows.is_empty() {
563                    break;
564                }
565
566                let mut ready_executions = Vec::with_capacity(rows.len());
567
568                for row in rows {
569                    ready_executions.push(ReadyExecution {
570                        execution_id: ExecutionId(row.try_get(0)?),
571                        job_id: JobId(row.try_get(1)?),
572                        job_type_id: JobTypeId::new_unchecked(row.try_get::<_, String>(2)?),
573                        input_payload_json: row.try_get(3)?,
574                        attempt_number: row.try_get::<_, i64>(4)? as u64,
575                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
576                        target_execution_time: UNIX_EPOCH
577                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(6)?),
578                    });
579                }
580
581                last_execution_id = ready_executions.last().map(|e| e.execution_id);
582
583                yield ready_executions;
584            }
585        })
586    }
587
588    async fn wait_for_ready_executions(&self) -> crate::Result<()> {
589        loop {
590            let mut conn = self.pool.get().await?;
591
592            let next_timestamp = {
593                let tx = conn.read_only_transaction().await?;
594
595                let stmt = tx
596                    .prepare(
597                        r#"--sql
598                        SELECT
599                            EXTRACT(EPOCH FROM target_execution_time)::DOUBLE PRECISION
600                        FROM
601                            ora.job
602                        JOIN ora.execution ON
603                            ora.execution.job_id = ora.job.id
604                        WHERE
605                            ora.execution.status = 0
606                        ORDER BY
607                            ora.job.target_execution_time ASC
608                        LIMIT 1
609                        "#,
610                    )
611                    .await?;
612
613                let row = tx.query_opt(&stmt, &[]).await?;
614
615                tx.commit().await?;
616
617                match row {
618                    Some(row) => row
619                        .try_get::<_, Option<f64>>(0)?
620                        .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
621                    None => None,
622                }
623            };
624
625            drop(conn);
626
627            if let Some(next_timestamp) = next_timestamp {
628                let now = std::time::SystemTime::now();
629
630                if next_timestamp <= now {
631                    break;
632                }
633
634                tokio::time::sleep(
635                    next_timestamp
636                        .duration_since(now)
637                        .unwrap_or_else(|_| Duration::from_secs(0)),
638                )
639                .await;
640                break;
641            }
642
643            tokio::time::sleep(Duration::from_millis(500)).await;
644        }
645
646        Ok(())
647    }
648
649    fn in_progress_executions(
650        &self,
651    ) -> impl Stream<Item = Result<Vec<InProgressExecution>>> + Send {
652        async_stream::try_stream!({
653            let mut last_execution_id: Option<ExecutionId> = None;
654
655            loop {
656                let mut conn = self.pool.get().await?;
657                let tx = conn.read_only_transaction().await?;
658
659                let rows = if let Some(last_execution_id) = last_execution_id {
660                    let stmt = tx
661                        .prepare(
662                            r#"--sql
663                            SELECT
664                                ora.execution.id,
665                                ora.job.id,
666                                ora.execution.executor_id,
667                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
668                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
669                                ora.job.timeout_policy_json,
670                                ora.job.retry_policy_json,
671                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
672                            FROM
673                                ora.execution
674                            JOIN ora.job ON
675                                ora.execution.job_id = ora.job.id
676                            WHERE
677                                status = 1
678                                AND ora.execution.id > $1::UUID
679                            ORDER BY
680                                ora.execution.id ASC
681                            LIMIT 1000
682                            "#,
683                        )
684                        .await?;
685
686                    tx.query(&stmt, &[&last_execution_id.0]).await?
687                } else {
688                    let stmt = tx
689                        .prepare(
690                            r#"--sql
691                            SELECT
692                                ora.execution.id,
693                                ora.job.id,
694                                ora.execution.executor_id,
695                                EXTRACT(EPOCH FROM ora.job.target_execution_time)::DOUBLE PRECISION,
696                                EXTRACT(EPOCH FROM ora.execution.started_at)::DOUBLE PRECISION,
697                                ora.job.timeout_policy_json,
698                                ora.job.retry_policy_json,
699                                (SELECT COUNT(*) FROM ora.execution ex WHERE ex.job_id = ora.job.id)
700                            FROM
701                                ora.execution
702                            JOIN ora.job ON
703                                ora.execution.job_id = ora.job.id
704                            WHERE
705                                status = 1
706                            ORDER BY
707                                ora.execution.id ASC
708                            LIMIT 1000
709                        "#,
710                        )
711                        .await?;
712
713                    tx.query(&stmt, &[]).await?
714                };
715
716                tx.commit().await?;
717
718                if rows.is_empty() {
719                    break;
720                }
721
722                let mut in_progress_executions = Vec::with_capacity(rows.len());
723
724                for row in rows {
725                    in_progress_executions.push(InProgressExecution {
726                        execution_id: ExecutionId(row.try_get(0)?),
727                        job_id: JobId(row.try_get(1)?),
728                        executor_id: ExecutorId(row.try_get(2)?),
729                        target_execution_time: UNIX_EPOCH
730                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(3)?),
731                        started_at: UNIX_EPOCH
732                            + std::time::Duration::from_secs_f64(row.try_get::<_, f64>(4)?),
733                        timeout_policy: serde_json::from_str(&row.try_get::<_, String>(5)?)?,
734                        retry_policy: serde_json::from_str(&row.try_get::<_, String>(6)?)?,
735                        attempt_number: row.try_get::<_, i64>(7)? as u64,
736                    });
737                }
738
739                last_execution_id = in_progress_executions.last().map(|e| e.execution_id);
740
741                yield in_progress_executions;
742            }
743        })
744    }
745
746    async fn executions_started(&self, executions: &[StartedExecution]) -> Result<()> {
747        if executions.is_empty() {
748            return Ok(());
749        }
750
751        let mut conn = self.pool.get().await?;
752
753        let tx = conn.transaction().await?;
754
755        let stmt = tx
756            .prepare(
757                r#"--sql
758                UPDATE ora.execution
759                SET
760                    executor_id = t.executor_id,
761                    started_at = to_timestamp(t.started_at)
762                FROM UNNEST(
763                    $1::UUID[],
764                    $2::UUID[],
765                    $3::DOUBLE PRECISION[]
766                ) AS t(execution_id, executor_id, started_at)
767                WHERE
768                    execution_id = id
769                    AND ora.execution.started_at IS NULL
770                "#,
771            )
772            .await?;
773
774        let mut col_execution_id = Vec::with_capacity(executions.len());
775        let mut col_executor_id = Vec::with_capacity(executions.len());
776        let mut col_started_at = Vec::with_capacity(executions.len());
777
778        for execution in executions {
779            col_execution_id.push(execution.execution_id.0);
780            col_executor_id.push(execution.executor_id.0);
781            col_started_at.push(
782                execution
783                    .started_at
784                    .duration_since(UNIX_EPOCH)
785                    .unwrap_or_default()
786                    .as_secs_f64(),
787            );
788        }
789
790        tx.execute(
791            &stmt,
792            &[&col_execution_id, &col_executor_id, &col_started_at],
793        )
794        .await?;
795
796        tx.commit().await?;
797
798        Ok(())
799    }
800
801    async fn executions_succeeded(&self, executions: &[SucceededExecution]) -> Result<()> {
802        if executions.is_empty() {
803            return Ok(());
804        }
805
806        let mut conn = self.pool.get().await?;
807
808        let tx = conn.transaction().await?;
809
810        let stmt = tx
811            .prepare(
812                r#"--sql
813                UPDATE ora.execution
814                SET
815                    succeeded_at = to_timestamp(t.succeeded_at),
816                    output_json = t.output_json
817                FROM UNNEST(
818                    $1::UUID[],
819                    $2::DOUBLE PRECISION[],
820                    $3::TEXT[]
821                ) AS t(execution_id, succeeded_at, output_json)
822                WHERE
823                    execution_id = id
824                    AND status < 2
825                RETURNING job_id
826                "#,
827            )
828            .await?;
829
830        let mut col_execution_id = Vec::with_capacity(executions.len());
831        let mut col_succeeded_at = Vec::with_capacity(executions.len());
832        let mut col_output_json = Vec::with_capacity(executions.len());
833
834        for execution in executions {
835            col_execution_id.push(execution.execution_id.0);
836            col_succeeded_at.push(
837                execution
838                    .succeeded_at
839                    .duration_since(UNIX_EPOCH)
840                    .unwrap_or_default()
841                    .as_secs_f64(),
842            );
843            col_output_json.push(execution.output_json.as_str());
844        }
845
846        let rows = tx
847            .query(
848                &stmt,
849                &[&col_execution_id, &col_succeeded_at, &col_output_json],
850            )
851            .await?;
852
853        let mut job_ids = Vec::with_capacity(rows.len());
854        for row in rows {
855            job_ids.push(JobId(row.try_get(0)?));
856        }
857
858        mark_jobs_inactive(&tx, &job_ids).await?;
859
860        tx.commit().await?;
861
862        Ok(())
863    }
864
865    async fn executions_failed(&self, executions: &[FailedExecution]) -> Result<()> {
866        if executions.is_empty() {
867            return Ok(());
868        }
869
870        let mut conn = self.pool.get().await?;
871        let tx = conn.transaction().await?;
872
873        let jobs = executions_failed(&tx, executions).await?;
874
875        mark_jobs_inactive(&tx, &jobs).await?;
876
877        tx.commit().await?;
878
879        Ok(())
880    }
881
882    async fn executions_retried(&self, executions: &[FailedExecution]) -> Result<()> {
883        if executions.is_empty() {
884            return Ok(());
885        }
886
887        let mut conn = self.pool.get().await?;
888        let tx = conn.transaction().await?;
889
890        let job_ids = executions_failed(&tx, executions).await?;
891        add_executions(&tx, &job_ids).await?;
892
893        tx.commit().await?;
894
895        Ok(())
896    }
897
898    fn pending_schedules(&self) -> impl Stream<Item = Result<Vec<PendingSchedule>>> + Send {
899        async_stream::try_stream!({
900            let mut last_schedule_id: Option<ScheduleId> = None;
901            loop {
902                let mut conn = self.pool.get().await?;
903                let tx = conn.read_only_transaction().await?;
904
905                let rows = if let Some(last_schedule_id) = last_schedule_id {
906                    let stmt = tx
907                        .prepare(
908                            r#"--sql
909                            SELECT
910                                id,
911                                EXTRACT(EPOCH FROM(
912                                    SELECT
913                                        target_execution_time
914                                    FROM
915                                        ora.job
916                                    WHERE
917                                        ora.job.schedule_id = ora.schedule.id
918                                    ORDER BY ora.job.id DESC
919                                    LIMIT 1
920                                ))::DOUBLE PRECISION,
921                                scheduling_policy_json,
922                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
923                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
924                                job_template_json
925                            FROM
926                                ora.schedule
927                            WHERE
928                                (
929                                    ora.schedule.stopped_at IS NULL
930                                    AND ora.schedule.active_job_id IS NULL
931                                )
932                                AND id > $1::UUID
933                            ORDER BY id ASC
934                            LIMIT 1000
935                            "#,
936                        )
937                        .await?;
938
939                    tx.query(&stmt, &[&last_schedule_id.0]).await?
940                } else {
941                    let stmt = tx
942                        .prepare(
943                            r#"--sql
944                            SELECT
945                                id,
946                                EXTRACT(EPOCH FROM(
947                                    SELECT
948                                        target_execution_time
949                                    FROM
950                                        ora.job
951                                    WHERE
952                                        ora.job.schedule_id = ora.schedule.id
953                                    ORDER BY ora.job.id DESC
954                                    LIMIT 1
955                                ))::DOUBLE PRECISION,
956                                scheduling_policy_json,
957                                EXTRACT(EPOCH FROM start_after)::DOUBLE PRECISION,
958                                EXTRACT(EPOCH FROM end_before)::DOUBLE PRECISION,
959                                job_template_json
960                            FROM
961                                ora.schedule
962                            WHERE
963                                ora.schedule.stopped_at IS NULL
964                                AND ora.schedule.active_job_id IS NULL
965                            ORDER BY id ASC
966                            LIMIT 1000
967                            "#,
968                        )
969                        .await?;
970
971                    tx.query(&stmt, &[]).await?
972                };
973
974                tx.commit().await?;
975
976                if rows.is_empty() {
977                    break;
978                }
979
980                let mut schedules = Vec::with_capacity(rows.len());
981
982                for row in rows {
983                    schedules.push(PendingSchedule {
984                        schedule_id: ScheduleId(row.try_get(0)?),
985                        last_target_execution_time: row
986                            .try_get::<_, Option<f64>>(1)?
987                            .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
988                        scheduling: serde_json::from_str(row.try_get::<_, &str>(2)?)?,
989                        time_range: TimeRange {
990                            start: row
991                                .try_get::<_, Option<f64>>(3)?
992                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
993                            end: row
994                                .try_get::<_, Option<f64>>(4)?
995                                .map(|ts| UNIX_EPOCH + std::time::Duration::from_secs_f64(ts)),
996                        },
997                        job_template: serde_json::from_str(row.try_get::<_, &str>(5)?)?,
998                    });
999                }
1000
1001                last_schedule_id = schedules.last().map(|s| s.schedule_id);
1002                yield schedules;
1003            }
1004        })
1005    }
1006
1007    async fn delete_history(&self, before: std::time::SystemTime) -> crate::Result<()> {
1008        let mut conn = self.pool.get().await?;
1009
1010        // delete in a loop so that we don't lock
1011        // tables for too long
1012        loop {
1013            let mut deleted_rows = 0;
1014
1015            {
1016                let tx = conn.transaction().await?;
1017
1018                let stmt = tx
1019                    .prepare(
1020                        r#"--sql
1021                        DELETE FROM ora.job
1022                        WHERE id IN (
1023                            SELECT ora.job.id
1024                            FROM ora.job
1025                            JOIN LATERAL (
1026                                SELECT
1027                                    status,
1028                                    succeeded_at,
1029                                    failed_at,
1030                                    cancelled_at
1031                                FROM
1032                                    ora.execution
1033                                WHERE
1034                                    ora.execution.job_id = ora.job.id
1035                                ORDER BY ora.execution.id DESC
1036                                LIMIT 1
1037                            ) e ON TRUE
1038                            WHERE
1039                                ora.job.inactive
1040                                AND (
1041                                    e.succeeded_at < to_timestamp($1)
1042                                    OR e.failed_at < to_timestamp($1)
1043                                    OR e.cancelled_at < to_timestamp($1)
1044                                )
1045                            LIMIT 25
1046                            FOR UPDATE SKIP LOCKED
1047                        );
1048                        "#,
1049                    )
1050                    .await?;
1051
1052                let deleted = tx
1053                    .execute(
1054                        &stmt,
1055                        &[&before
1056                            .duration_since(UNIX_EPOCH)
1057                            .unwrap_or_default()
1058                            .as_secs_f64()],
1059                    )
1060                    .await?;
1061
1062                deleted_rows += deleted;
1063
1064                tx.commit().await?;
1065            }
1066
1067            {
1068                let tx = conn.transaction().await?;
1069                let stmt = tx
1070                    .prepare(
1071                        r#"--sql
1072                        DELETE FROM ONLY ora.schedule
1073                        WHERE ctid IN (
1074                            SELECT ctid
1075                            FROM ora.schedule
1076                            WHERE 
1077                                ora.schedule.stopped_at < to_timestamp($1)
1078                            LIMIT 25
1079                            FOR UPDATE SKIP LOCKED
1080                        )
1081                        "#,
1082                    )
1083                    .await?;
1084
1085                let deleted = tx
1086                    .execute(
1087                        &stmt,
1088                        &[&before
1089                            .duration_since(UNIX_EPOCH)
1090                            .unwrap_or_default()
1091                            .as_secs_f64()],
1092                    )
1093                    .await?;
1094
1095                tx.commit().await?;
1096
1097                deleted_rows += deleted;
1098            }
1099
1100            {
1101                let tx = conn.transaction().await?;
1102
1103                let stmt = tx
1104                    .prepare(
1105                        r#"--sql
1106                        DELETE FROM ora.job_type
1107                        WHERE
1108                            NOT EXISTS (
1109                                SELECT FROM ora.job
1110                                WHERE
1111                                    ora.job.job_type_id = ora.job_type.id
1112                            )
1113                            AND NOT EXISTS (
1114                                SELECT FROM ora.schedule
1115                                WHERE
1116                                    ora.schedule.job_template_job_type_id = ora.job_type.id                                
1117                            )
1118                        "#,
1119                    )
1120                    .await?;
1121
1122                tx.execute(&stmt, &[]).await?;
1123
1124                tx.commit().await?;
1125            }
1126
1127            if deleted_rows == 0 {
1128                break;
1129            }
1130        }
1131
1132        Ok(())
1133    }
1134
1135    async fn wait_for_pending_schedules(&self) -> crate::Result<()> {
1136        loop {
1137            let mut conn = self.pool.get().await?;
1138
1139            let has_pending = {
1140                let tx = conn.read_only_transaction().await?;
1141
1142                let stmt = tx
1143                    .prepare(
1144                        r#"--sql
1145                        SELECT 1 FROM ora.schedule
1146                        WHERE
1147                            ora.schedule.stopped_at IS NULL
1148                            AND ora.schedule.active_job_id IS NULL
1149                        LIMIT 1
1150                        "#,
1151                    )
1152                    .await?;
1153
1154                let row = tx.query_opt(&stmt, &[]).await?;
1155
1156                tx.commit().await?;
1157
1158                row.is_some()
1159            };
1160
1161            drop(conn);
1162
1163            if has_pending {
1164                break;
1165            }
1166
1167            tokio::time::sleep(Duration::from_millis(500)).await;
1168        }
1169
1170        Ok(())
1171    }
1172}
1173
1174async fn executions_failed(
1175    tx: &DbTransaction<'_>,
1176    executions: &[FailedExecution],
1177) -> Result<Vec<JobId>> {
1178    let stmt = tx
1179        .prepare(
1180            r#"--sql
1181            UPDATE ora.execution
1182            SET
1183                failed_at = to_timestamp(t.failed_at),
1184                failure_reason = t.failure_reason
1185            FROM UNNEST(
1186                $1::UUID[],
1187                $2::DOUBLE PRECISION[],
1188                $3::TEXT[]
1189            ) AS t(execution_id, failed_at, failure_reason)
1190            WHERE
1191                execution_id = id
1192                AND status < 2
1193            RETURNING job_id;
1194            "#,
1195        )
1196        .await?;
1197
1198    let mut col_execution_id = Vec::with_capacity(executions.len());
1199    let mut col_succeeded_at = Vec::with_capacity(executions.len());
1200    let mut col_failure_reason = Vec::with_capacity(executions.len());
1201
1202    for execution in executions {
1203        col_execution_id.push(execution.execution_id.0);
1204        col_succeeded_at.push(
1205            execution
1206                .failed_at
1207                .duration_since(UNIX_EPOCH)
1208                .unwrap_or_default()
1209                .as_secs_f64(),
1210        );
1211        col_failure_reason.push(execution.failure_reason.as_str());
1212    }
1213
1214    let rows = tx
1215        .query(
1216            &stmt,
1217            &[&col_execution_id, &col_succeeded_at, &col_failure_reason],
1218        )
1219        .await?;
1220
1221    let job_ids = rows.into_iter().map(|row| JobId(row.get(0))).collect();
1222
1223    Ok(job_ids)
1224}
1225
1226async fn add_executions(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1227    if jobs.is_empty() {
1228        return Ok(());
1229    }
1230
1231    let mut col_job_id = Vec::with_capacity(jobs.len());
1232    let mut col_execution_id = Vec::with_capacity(jobs.len());
1233
1234    for job in jobs {
1235        col_job_id.push(job.0);
1236        col_execution_id.push(Uuid::now_v7());
1237    }
1238
1239    let stmt = tx
1240        .prepare(
1241            r#"--sql
1242            INSERT INTO ora.execution (
1243                id,
1244                job_id
1245            ) SELECT * FROM UNNEST(
1246                $1::UUID[],
1247                $2::UUID[]
1248            )
1249            "#,
1250        )
1251        .await?;
1252
1253    tx.execute(&stmt, &[&col_execution_id, &col_job_id]).await?;
1254
1255    Ok(())
1256}
1257
1258async fn mark_jobs_inactive(tx: &DbTransaction<'_>, jobs: &[JobId]) -> Result<()> {
1259    if jobs.is_empty() {
1260        return Ok(());
1261    }
1262
1263    let mut col_job_id = Vec::with_capacity(jobs.len());
1264
1265    for job in jobs {
1266        col_job_id.push(job.0);
1267    }
1268
1269    {
1270        let stmt = tx
1271            .prepare(
1272                r#"--sql
1273                UPDATE ora.job
1274                SET inactive = TRUE
1275                WHERE id = ANY($1::UUID[])
1276                "#,
1277            )
1278            .await?;
1279
1280        tx.execute(&stmt, &[&col_job_id]).await?;
1281    }
1282
1283    {
1284        let stmt = tx
1285            .prepare(
1286                r#"--sql
1287                UPDATE ora.schedule
1288                SET
1289                    active_job_id = NULL
1290                WHERE active_job_id = ANY($1::UUID[])
1291                "#,
1292            )
1293            .await?;
1294
1295        tx.execute(&stmt, &[&col_job_id]).await?;
1296    }
1297
1298    Ok(())
1299}