1use std::sync::Arc;
4
5use async_trait::async_trait;
6use eyre::Context;
7use migrations::run_migrations;
8use models::{JobRetryPolicy, JobTimeoutPolicy, SqlSystemTime};
9use ora_storage::{ScheduleTimeRange, Storage};
10use parking_lot::Mutex;
11use rusqlite::{named_params, params, params_from_iter, Connection};
12use uuid::Uuid;
13
14#[macro_use]
15mod util;
16
17mod migrations;
18mod models;
19
20mod job_query;
21mod schedule_query;
22
23mod sea_query_binder;
24
25const DEFAULT_TEMP_TABLE_THRESHOLD: usize = 1000;
26
27#[derive(Debug, Clone)]
29pub struct SqliteStorage {
30    db: Arc<Mutex<Connection>>,
31}
32
33impl SqliteStorage {
34    pub fn new(mut connection: Connection) -> eyre::Result<Self> {
36        run_migrations(&mut connection).wrap_err("failed to run migrations")?;
37        Ok(Self {
38            db: Arc::new(Mutex::new(connection)),
39        })
40    }
41
42    pub async fn optimize(&self) -> eyre::Result<()> {
46        self.with_db(|db| {
47            db.execute_batch(
48                r#"--sql
49                    PRAGMA optimize;
50                "#,
51            )?;
52            Ok(())
53        })
54        .await
55    }
56
57    pub async fn vacuum(&self) -> eyre::Result<()> {
62        self.with_db(|db| {
63            db.execute_batch(
64                r#"--sql
65                    VACUUM;
66                "#,
67            )?;
68            Ok(())
69        })
70        .await
71    }
72
73    async fn with_db<F, O>(&self, f: F) -> eyre::Result<O>
76    where
77        F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
78        O: Send + 'static,
79    {
80        let span = tracing::Span::current();
81
82        let db = self.db.clone();
83        tokio::task::spawn_blocking(move || {
84            let _guard = span.enter();
85            tracing::trace!("acquiring database lock");
86            let mut db = db.lock();
87            tracing::trace!("acquired database lock");
88            f(&mut db)
89        })
90        .await
91        .unwrap()
92    }
93}
94
95#[async_trait]
96impl Storage for SqliteStorage {
97    #[tracing::instrument(skip_all)]
98    async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
99        if job_types.is_empty() {
100            return Ok(());
101        }
102
103        self.with_db(|db| {
104            let tx = db.transaction()?;
105
106            {
107                let mut insert_stmt = tx.prepare(
108                    r#"--sql
109                        INSERT INTO ora_job_type (
110                            id,
111                            name,
112                            description,
113                            input_schema_json,
114                            output_schema_json
115                        )
116                        VALUES (
117                            :id,
118                            :name,
119                            :description,
120                            :input_schema_json,
121                            :output_schema_json
122                        )
123                        ON CONFLICT(id) DO UPDATE SET
124                            name = excluded.name,
125                            description = excluded.description,
126                            input_schema_json = excluded.input_schema_json,
127                            output_schema_json = excluded.output_schema_json;
128                    "#,
129                )?;
130
131                for job_type in job_types {
132                    insert_stmt.execute(params![
133                        job_type.id,
134                        job_type.name,
135                        job_type.description,
136                        job_type.input_schema_json,
137                        job_type.output_schema_json
138                    ])?;
139                }
140            }
141
142            tx.commit()?;
143
144            Ok(())
145        })
146        .await
147    }
148
149    #[tracing::instrument(skip_all)]
150    async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
151        if jobs.is_empty() {
152            return Ok(());
153        }
154
155        self.with_db(|db| {
156            let tx = db.transaction()?;
157
158            {
159                let mut insert_stmt = tx.prepare_cached(
160                    r#"--sql
161                        INSERT INTO ora_job (
162                            id,
163                            schedule_id,
164                            created_at_unix_ns,
165                            job_type_id,
166                            target_execution_time_unix_ns,
167                            retry_policy,
168                            timeout_policy,
169                            input_payload_json,
170                            metadata_json
171                        )
172                        VALUES (
173                            :id,
174                            :schedule_id,
175                            :created_at_unix_ns,
176                            :job_type_id,
177                            :target_execution_time_unix_ns,
178                            :retry_policy,
179                            :timeout_policy,
180                            :input_payload_json,
181                            :metadata_json
182                        );
183                    "#,
184                )?;
185
186                let mut insert_label_stmt = tx.prepare_cached(
187                    r#"--sql
188                        INSERT INTO ora_job_label (
189                            job_id,
190                            key,
191                            value
192                        )
193                        VALUES (
194                            :job_id,
195                            :key,
196                            :value
197                        );
198                    "#,
199                )?;
200
201                for job in jobs {
202                    insert_stmt.execute(params![
203                        job.id,
204                        job.schedule_id,
205                        SqlSystemTime(job.created_at),
206                        job.job_type_id,
207                        SqlSystemTime(job.target_execution_time),
208                        JobRetryPolicy::from(job.retry_policy),
209                        JobTimeoutPolicy::from(job.timeout_policy),
210                        job.input_payload_json,
211                        job.metadata_json
212                    ])?;
213
214                    for (key, value) in job.labels {
215                        insert_label_stmt.execute(params![job.id, key, value])?;
216                    }
217                }
218            }
219
220            tx.commit()?;
221
222            Ok(())
223        })
224        .await
225    }
226
227    #[tracing::instrument(skip_all)]
228    async fn jobs_cancelled(
229        &self,
230        job_ids: &[Uuid],
231        timestamp: std::time::SystemTime,
232    ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
233        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
234
235        if job_ids.is_empty() {
236            return Ok(vec![]);
237        }
238
239        let job_ids: Vec<_> = job_ids.into();
240
241        self.with_db(move |db| {
242            let tx = db.transaction()?;
243
244            let cancelled_jobs = if job_ids.len() < TEMP_TABLE_THRESHOLD {
245                let mut update_stmt = tx.prepare_cached(
246                    r#"--sql
247                        UPDATE ora_job
248                        SET
249                            cancelled_at_unix_ns = :cancelled_at_unix_ns,
250                            marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
251                        WHERE
252                            id = :id
253                            AND cancelled_at_unix_ns IS NULL
254                            AND marked_unschedulable_at_unix_ns IS NULL
255                        RETURNING id, (
256                            SELECT id
257                            FROM ora_execution
258                            WHERE job_id = ora_job.id
259                            AND active
260                            LIMIT 1
261                        ) AS execution_id;
262                    "#,
263                )?;
264
265                let mut cancelled_jobs = Vec::new();
266
267                for id in &job_ids {
268                    let mut rows = update_stmt.query_map(
269                        named_params![
270                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
271                            ":id": id
272                        ],
273                        |row| {
274                            Ok(ora_storage::CancelledJob {
275                                id: row.get(0)?,
276                                active_execution: row.get(1)?,
277                            })
278                        },
279                    )?;
280
281                    if let Some(cancelled_job) = rows.next() {
282                        let cancelled_job = cancelled_job?;
283                        cancelled_jobs.push(cancelled_job);
284                    }
285                }
286
287                cancelled_jobs
288            } else {
289                {
290                    tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
291
292                    let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
293
294                    for id in &job_ids {
295                        insert_stmt.execute(params![id])?;
296                    }
297                }
298
299                let mut update_stmt = tx.prepare_cached(
300                    r#"--sql
301                    UPDATE ora_job
302                    SET
303                        cancelled_at_unix_ns = :cancelled_at_unix_ns,
304                        marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
305                    WHERE
306                        id IN (SELECT id FROM temp.job_ids)
307                        AND cancelled_at_unix_ns IS NULL
308                        AND marked_unschedulable_at_unix_ns IS NULL
309                    RETURNING id, (
310                        SELECT id
311                        FROM ora_execution
312                        WHERE job_id = ora_job.id
313                        AND active
314                        LIMIT 1
315                    ) AS execution_id;
316                    "#,
317                )?;
318
319                let cancelled_jobs = update_stmt
320                    .query_map(
321                        named_params![
322                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
323                        ],
324                        |row| {
325                            Ok(ora_storage::CancelledJob {
326                                id: row.get(0)?,
327                                active_execution: row.get(1)?,
328                            })
329                        },
330                    )?
331                    .collect::<Result<Vec<ora_storage::CancelledJob>, _>>()?;
332
333                tx.execute("DROP TABLE temp.job_ids", [])?;
334
335                cancelled_jobs
336            };
337
338            tx.commit()?;
339
340            Ok(cancelled_jobs)
341        })
342        .await
343    }
344
345    #[tracing::instrument(skip_all)]
346    async fn executions_added(
347        &self,
348        executions: Vec<ora_storage::NewExecution>,
349        timestamp: std::time::SystemTime,
350    ) -> eyre::Result<()> {
351        if executions.is_empty() {
352            return Ok(());
353        }
354
355        self.with_db(move |db| {
356            let tx = db.transaction()?;
357
358            {
359                let mut insert_stmt = tx.prepare_cached(
360                    r#"--sql
361                        INSERT INTO ora_execution (
362                            id,
363                            job_id,
364                            created_at_unix_ns
365                        )
366                        VALUES (
367                            :id,
368                            :job_id,
369                            :created_at_unix_ns
370                        );
371                    "#,
372                )?;
373
374                for execution in executions {
375                    insert_stmt.execute(params![
376                        execution.id,
377                        execution.job_id,
378                        SqlSystemTime(timestamp)
379                    ])?;
380                }
381            }
382
383            tx.commit()?;
384
385            Ok(())
386        })
387        .await
388    }
389
390    #[tracing::instrument(skip_all)]
391    async fn executions_ready(
392        &self,
393        execution_ids: &[Uuid],
394        timestamp: std::time::SystemTime,
395    ) -> eyre::Result<()> {
396        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
397
398        if execution_ids.is_empty() {
399            return Ok(());
400        }
401
402        let execution_ids: Vec<_> = execution_ids.into();
403
404        self.with_db(move |db| {
405            let tx = db.transaction()?;
406
407            if execution_ids.len() < TEMP_TABLE_THRESHOLD {
408                let mut update_stmt = tx.prepare_cached(
409                    r#"--sql
410                        UPDATE ora_execution
411                        SET ready_at_unix_ns = :ready_at_unix_ns
412                        WHERE id = :id;
413                    "#,
414                )?;
415
416                for id in &execution_ids {
417                    update_stmt.execute(params![SqlSystemTime(timestamp), id])?;
418                }
419            } else {
420                {
421                    tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
422
423                    let mut insert_stmt =
424                        tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
425
426                    for id in &execution_ids {
427                        insert_stmt.execute(params![id])?;
428                    }
429                }
430
431                let mut update_stmt = tx.prepare_cached(
432                    r#"--sql
433                        UPDATE ora_execution
434                        SET ready_at_unix_ns = :ready_at_unix_ns
435                        WHERE
436                            id IN (SELECT id FROM temp.execution_ids)
437                            AND ready_at_unix_ns IS NULL;
438                    "#,
439                )?;
440
441                update_stmt.execute(params![SqlSystemTime(timestamp)])?;
442
443                tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
444            }
445
446            tx.commit()?;
447
448            Ok(())
449        })
450        .await
451    }
452
453    #[tracing::instrument(skip_all)]
454    async fn execution_assigned(
455        &self,
456        execution_id: Uuid,
457        executor_id: Uuid,
458        timestamp: std::time::SystemTime,
459    ) -> eyre::Result<()> {
460        self.with_db(move |db| {
461            let tx = db.transaction()?;
462
463            {
464                let mut update_stmt = tx.prepare_cached(
465                    r#"--sql
466                        UPDATE ora_execution
467                        SET executor_id = :executor_id,
468                            assigned_at_unix_ns = :assigned_at_unix_ns
469                        WHERE id = :id;
470                    "#,
471                )?;
472
473                update_stmt.execute(named_params![
474                    ":executor_id": executor_id,
475                    ":assigned_at_unix_ns": SqlSystemTime(timestamp),
476                    ":id": execution_id
477                ])?;
478            }
479
480            tx.commit()?;
481
482            Ok(())
483        })
484        .await
485    }
486
487    #[tracing::instrument(skip_all)]
488    async fn execution_started(
489        &self,
490        execution_id: Uuid,
491        timestamp: std::time::SystemTime,
492    ) -> eyre::Result<()> {
493        self.with_db(move |db| {
494            let tx = db.transaction()?;
495
496            {
497                let mut update_stmt = tx.prepare_cached(
498                    r#"--sql
499                        UPDATE ora_execution
500                        SET started_at_unix_ns = :started_at_unix_ns
501                        WHERE id = :id;
502                    "#,
503                )?;
504
505                update_stmt.execute(named_params! {
506                    ":started_at_unix_ns": SqlSystemTime(timestamp),
507                    ":id": execution_id
508                })?;
509            }
510
511            tx.commit()?;
512
513            Ok(())
514        })
515        .await
516    }
517
518    #[tracing::instrument(skip_all)]
519    async fn execution_succeeded(
520        &self,
521        execution_id: Uuid,
522        timestamp: std::time::SystemTime,
523        output_payload_json: String,
524    ) -> eyre::Result<()> {
525        self.with_db(move |db| {
526            let tx = db.transaction()?;
527
528            {
529                let mut update_stmt = tx.prepare_cached(
530                    r#"--sql
531                        UPDATE ora_execution
532                        SET succeeded_at_unix_ns = :succeeded_at_unix_ns,
533                            output_payload_json = :output_payload_json
534                        WHERE id = :id;
535                        "#,
536                )?;
537
538                update_stmt.execute(named_params![
539                    ":succeeded_at_unix_ns": SqlSystemTime(timestamp),
540                    ":output_payload_json": output_payload_json,
541                    ":id": execution_id
542                ])?;
543            }
544
545            {
546                let mut update_stmt = tx.prepare_cached(
547                    r#"--sql
548                    UPDATE ora_job
549                    SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
550                    WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
551                    "#,
552                )?;
553
554                update_stmt.execute(named_params! {
555                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
556                    ":id": execution_id
557                })?;
558            }
559            tx.commit()?;
560
561            Ok(())
562        })
563        .await
564    }
565
566    #[tracing::instrument(skip_all)]
567    async fn executions_failed(
568        &self,
569        execution_ids: &[Uuid],
570        timestamp: std::time::SystemTime,
571        reason: String,
572        mark_job_unschedulable: bool,
573    ) -> eyre::Result<()> {
574        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
575
576        if execution_ids.is_empty() {
577            return Ok(());
578        }
579
580        let execution_ids: Vec<_> = execution_ids.into();
581
582        self.with_db(move |db| {
583            let tx = db.transaction()?;
584
585            if execution_ids.len() < TEMP_TABLE_THRESHOLD {
586                let mut update_stmt = tx.prepare_cached(
587                    r#"--sql
588                        UPDATE ora_execution
589                        SET failed_at_unix_ns = :failed_at_unix_ns,
590                            failure_reason = :failure_reason
591                        WHERE id = :id;
592                    "#,
593                )?;
594
595                for id in &execution_ids {
596                    update_stmt.execute(named_params![
597                        ":failed_at_unix_ns": SqlSystemTime(timestamp),
598                        ":failure_reason": reason,
599                        ":id": id
600                    ])?;
601                }
602
603                if mark_job_unschedulable {
604                    let mut update_stmt = tx.prepare_cached(
605                        r#"--sql
606                            UPDATE ora_job
607                            SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
608                            WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
609                        "#,
610                    )?;
611
612                    for id in &execution_ids {
613                        update_stmt.execute(named_params! {
614                            ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
615                            ":id": id
616                        })?;
617                    }
618                }
619            } else {
620                {
621                    tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
622
623                    let mut insert_stmt =
624                        tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
625
626                    for id in &execution_ids {
627                        insert_stmt.execute(params![id])?;
628                    }
629                }
630
631                let mut update_stmt = tx.prepare_cached(
632                    r#"--sql
633                        UPDATE ora_execution
634                        SET failed_at_unix_ns = :failed_at_unix_ns,
635                            failure_reason = :failure_reason
636                        WHERE
637                            id IN (SELECT id FROM temp.execution_ids)
638                            AND failed_at_unix_ns IS NULL;
639                    "#,
640                )?;
641
642                update_stmt.execute(named_params![
643                    ":failed_at_unix_ns": SqlSystemTime(timestamp),
644                    ":failure_reason": reason
645                ])?;
646
647                if mark_job_unschedulable {
648                    let mut update_stmt = tx.prepare_cached(
649                        r#"--sql
650                            UPDATE ora_job
651                            SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
652                            WHERE id IN (SELECT job_id FROM ora_execution WHERE id IN (SELECT id FROM temp.execution_ids));
653                        "#,
654                    )?;
655
656                    update_stmt.execute(named_params! {
657                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
658                    })?;
659                }
660            }
661
662
663
664            tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
665
666            tx.commit()?;
667
668            Ok(())
669        })
670        .await
671    }
672
673    #[tracing::instrument(skip_all)]
674    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
675        let executor_ids: Vec<_> = executor_ids.into();
676
677        self.with_db(move |db| {
678            if executor_ids.is_empty() {
679                let mut stmt = db.prepare(
680                    r#"--sql
681                        SELECT id
682                        FROM ora_execution
683                        WHERE
684                            executor_id IS NOT NULL
685                            AND (
686                                succeeded_at_unix_ns IS NULL
687                                AND failed_at_unix_ns IS NULL
688                            );
689                    "#,
690                )?;
691
692                let ids = stmt
693                    .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
694                    .collect::<Result<Vec<Uuid>, _>>()?;
695
696                Ok(ids)
697            } else {
698                let id_params_sql = "?,".repeat(executor_ids.len());
699                let id_params_sql = &id_params_sql[..id_params_sql.len() - 1];
700
701                let mut stmt = db.prepare(&format!(
702                    r#"--sql
703                        SELECT id
704                        FROM ora_execution
705                        WHERE
706                            executor_id NOT IN ({id_params_sql})
707                            AND (
708                                succeeded_at_unix_ns IS NULL
709                                AND failed_at_unix_ns IS NULL
710                            );
711                    "#
712                ))?;
713
714                let ids = stmt
715                    .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
716                    .collect::<Result<Vec<Uuid>, _>>()?;
717
718                Ok(ids)
719            }
720        })
721        .await
722    }
723
724    #[tracing::instrument(skip_all)]
725    async fn jobs_unschedulable(
726        &self,
727        job_ids: &[Uuid],
728        timestamp: std::time::SystemTime,
729    ) -> eyre::Result<()> {
730        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
731
732        if job_ids.is_empty() {
733            return Ok(());
734        }
735
736        let job_ids: Vec<_> = job_ids.into();
737
738        self.with_db(move |db| {
739            let tx = db.transaction()?;
740
741            if job_ids.len() < TEMP_TABLE_THRESHOLD {
742                let mut update_stmt = tx.prepare_cached(
743                    r#"--sql
744                        UPDATE ora_job
745                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
746                        WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
747                    "#,
748                )?;
749
750                for id in &job_ids {
751                    update_stmt.execute(named_params![
752                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
753                        ":id": id
754                    ])?;
755                }
756            } else {
757                {
758                    tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
759
760                    let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
761
762                    for id in &job_ids {
763                        insert_stmt.execute(params![id])?;
764                    }
765                }
766
767                let mut update_stmt = tx.prepare_cached(
768                    r#"--sql
769                        UPDATE ora_job
770                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
771                        WHERE
772                            id IN (SELECT id FROM temp.job_ids)
773                            AND marked_unschedulable_at_unix_ns IS NULL;
774                    "#,
775                )?;
776
777                update_stmt.execute(named_params![
778                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
779                ])?;
780
781                tx.execute("DROP TABLE temp.job_ids", [])?;
782            }
783
784            tx.commit()?;
785
786            Ok(())
787        })
788        .await
789    }
790
791    #[tracing::instrument(skip_all)]
792    async fn pending_executions(
793        &self,
794        after: Option<Uuid>,
795    ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
796        self.with_db(move |db| {
797            let mut stmt = db.prepare_cached(
798                r#"--sql
799                    SELECT
800                        e.id,
801                        j.target_execution_time_unix_ns
802                    FROM ora_execution e
803                    JOIN
804                        ora_job j
805                    ON
806                        e.job_id = j.id
807                    WHERE
808                        e.ready_at_unix_ns IS NULL
809                        AND (? IS NULL OR e.id > ?)
810                    LIMIT 10000;
811                "#,
812            )?;
813
814            let pending_executions = stmt
815                .query_map(params![after, after], |row| {
816                    Ok(ora_storage::PendingExecution {
817                        id: row.get(0)?,
818                        target_execution_time: row.get::<_, SqlSystemTime>(1)?.into(),
819                    })
820                })?
821                .collect::<Result<Vec<_>, _>>()?;
822
823            Ok(pending_executions)
824        })
825        .await
826    }
827
828    #[tracing::instrument(skip_all)]
829    async fn ready_executions(
830        &self,
831        after: Option<Uuid>,
832    ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
833        self.with_db(move |db| {
834            let mut stmt = db.prepare_cached(
835                r#"--sql
836                    SELECT
837                        e.id,
838                        e.job_id,
839                        j.input_payload_json,
840                        (SELECT COUNT(*) FROM ora_execution WHERE job_id = e.job_id) AS attempt_number,
841                        j.job_type_id,
842                        j.target_execution_time_unix_ns,
843                        j.timeout_policy
844                    FROM ora_execution e
845                    JOIN
846                        ora_job j
847                    ON
848                        j.id = e.job_id
849                    WHERE
850                        ready_at_unix_ns IS NOT NULL
851                        AND assigned_at_unix_ns IS NULL
852                        AND started_at_unix_ns IS NULL
853                        AND failed_at_unix_ns IS NULL
854                        AND succeeded_at_unix_ns IS NULL
855                        AND (? IS NULL OR e.id > ?)
856                    ORDER BY e.id ASC
857                    LIMIT 10000;
858                "#,
859            )?;
860
861            let mut rows = stmt.query([
862                after,
863                after,
864            ])?;
865
866            let mut ready_executions = Vec::new();
867
868            while let Some(row) = rows.next()? {
869                ready_executions.push(ora_storage::ReadyExecution {
870                    id: row.get(0)?,
871                    job_id: row.get(1)?,
872                    input_payload_json: row.get(2)?,
873                    attempt_number: row.get(3)?,
874                    job_type_id: row.get(4)?,
875                    target_execution_time: row.get::<_, SqlSystemTime>(5)?.into(),
876                    timeout_policy: row.get::<_, JobTimeoutPolicy>(6)?.into(),
877                });
878            }
879
880            Ok(ready_executions)
881
882        })
883        .await
884    }
885
886    #[tracing::instrument(skip_all)]
887    async fn pending_jobs(
888        &self,
889        after: Option<Uuid>,
890    ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
891        self.with_db(move |db| {
892            let mut stmt = db.prepare_cached(
893                r#"--sql
894                    SELECT
895                        id,
896                        target_execution_time_unix_ns,
897                        (
898                            SELECT COUNT(*)
899                            FROM ora_execution
900                            WHERE job_id = ora_job.id
901                        ) AS execution_count,
902                        retry_policy,
903                        timeout_policy
904                    FROM ora_job
905                    WHERE
906                        marked_unschedulable_at_unix_ns IS NULL
907                        AND NOT EXISTS (
908                            SELECT 1
909                            FROM ora_execution
910                            WHERE job_id = ora_job.id
911                            AND succeeded_at_unix_ns IS NULL
912                            AND failed_at_unix_ns IS NULL
913                        )
914                        AND (? IS NULL OR id > ?)
915                    ORDER BY id ASC
916                    LIMIT 10000;
917                "#,
918            )?;
919
920            let mut rows = stmt.query([after, after])?;
921
922            let mut pending_jobs = Vec::new();
923
924            while let Some(row) = rows.next()? {
925                pending_jobs.push(ora_storage::PendingJob {
926                    id: row.get(0)?,
927                    target_execution_time: row.get::<_, SqlSystemTime>(1)?.0,
928                    execution_count: row.get(2)?,
929                    retry_policy: row.get::<_, JobRetryPolicy>(3)?.into(),
930                    timeout_policy: row.get::<_, JobTimeoutPolicy>(4)?.into(),
931                });
932            }
933
934            Ok(pending_jobs)
935        })
936        .await
937    }
938
939    #[tracing::instrument(skip_all)]
940    async fn query_jobs(
941        &self,
942        cursor: Option<String>,
943        limit: usize,
944        order: ora_storage::JobQueryOrder,
945        filters: ora_storage::JobQueryFilters,
946    ) -> eyre::Result<ora_storage::JobQueryResult> {
947        let cursor: Option<job_query::Cursor> = match cursor {
948            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
949            None => None,
950        };
951
952        self.with_db(move |db| {
953            let mut tx = db.transaction()?;
954            let res = job_query::query_job_details(&mut tx, cursor, limit, order, filters);
955            tx.commit()?;
956            res
957        })
958        .await
959    }
960
961    #[tracing::instrument(skip_all)]
962    async fn query_job_ids(
963        &self,
964        filters: ora_storage::JobQueryFilters,
965    ) -> eyre::Result<Vec<Uuid>> {
966        self.with_db(|db| {
967            let mut tx = db.transaction()?;
968            let res = job_query::job_ids(&mut tx, filters);
969            tx.commit()?;
970            res
971        })
972        .await
973    }
974
975    #[tracing::instrument(skip_all)]
976    async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
977        self.with_db(|db| {
978            let mut tx = db.transaction()?;
979            let res = job_query::count_jobs(&mut tx, filters);
980            tx.commit()?;
981            res
982        })
983        .await
984    }
985
986    #[tracing::instrument(skip_all)]
987    async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
988        self.with_db(|db| {
989            let mut stmt = db.prepare_cached(
990                r#"--sql
991                    SELECT
992                        id,
993                        name,
994                        description,
995                        input_schema_json,
996                        output_schema_json
997                    FROM ora_job_type;
998                "#,
999            )?;
1000
1001            let job_types = stmt
1002                .query_map([], |row| {
1003                    Ok(ora_storage::JobType {
1004                        id: row.get(0)?,
1005                        name: row.get(1)?,
1006                        description: row.get(2)?,
1007                        input_schema_json: row.get(3)?,
1008                        output_schema_json: row.get(4)?,
1009                    })
1010                })?
1011                .collect::<Result<Vec<_>, _>>()?;
1012
1013            Ok(job_types)
1014        })
1015        .await
1016    }
1017
1018    #[tracing::instrument(skip_all)]
1019    async fn delete_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
1020        self.with_db(|db| {
1021            let mut tx = db.transaction()?;
1022            let res = job_query::delete_jobs(&mut tx, filters);
1023            tx.commit()?;
1024            res
1025        })
1026        .await
1027    }
1028
1029    #[tracing::instrument(skip_all)]
1030    async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
1031        if schedules.is_empty() {
1032            return Ok(());
1033        }
1034
1035        self.with_db(|db| {
1036            let tx = db.transaction()?;
1037
1038            {
1039                let mut insert_stmt = tx.prepare_cached(
1040                    r#"--sql
1041                        INSERT INTO ora_schedule (
1042                            id,
1043                            created_at_unix_ns,
1044                            job_type_id,
1045                            job_timing_policy,
1046                            job_creation_policy,
1047                            start_after_unix_ns,
1048                            end_before_unix_ns,
1049                            metadata_json
1050                        )
1051                        VALUES (
1052                            :id,
1053                            :created_at_unix_ns,
1054                            :job_type_id,
1055                            :job_timing_policy,
1056                            :job_creation_policy,
1057                            :start_after_unix_ns,
1058                            :end_before_unix_ns,
1059                            :metadata_json
1060                        );
1061                    "#,
1062                )?;
1063
1064                for schedule in schedules {
1065                    let job_type_id = match &schedule.job_creation_policy {
1066                        ora_storage::ScheduleJobCreationPolicy::JobDefinition(
1067                            schedule_new_job_definition,
1068                        ) => schedule_new_job_definition.job_type_id.clone(),
1069                    };
1070
1071                    let start_after = schedule.time_range.as_ref().and_then(|r| r.start);
1072                    let end_before = schedule.time_range.as_ref().and_then(|r| r.end);
1073
1074                    insert_stmt.execute(params![
1075                        schedule.id,
1076                        SqlSystemTime(schedule.created_at),
1077                        job_type_id,
1078                        crate::models::ScheduleJobTimingPolicy::from(schedule.job_timing_policy),
1079                        crate::models::ScheduleJobCreationPolicy::from(
1080                            schedule.job_creation_policy
1081                        ),
1082                        start_after.map(SqlSystemTime),
1083                        end_before.map(SqlSystemTime),
1084                        schedule.metadata_json
1085                    ])?;
1086
1087                    let mut insert_label_stmt = tx.prepare_cached(
1088                        r#"--sql
1089                            INSERT INTO ora_schedule_label (
1090                                schedule_id,
1091                                key,
1092                                value
1093                            )
1094                            VALUES (
1095                                :schedule_id,
1096                                :key,
1097                                :value
1098                            );
1099                        "#,
1100                    )?;
1101
1102                    for (key, value) in schedule.labels {
1103                        insert_label_stmt.execute(params![schedule.id, key, value])?;
1104                    }
1105                }
1106            }
1107
1108            tx.commit()?;
1109
1110            Ok(())
1111        })
1112        .await
1113    }
1114
1115    #[tracing::instrument(skip_all)]
1116    async fn schedules_cancelled(
1117        &self,
1118        schedule_ids: &[Uuid],
1119        timestamp: std::time::SystemTime,
1120    ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
1121        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1122
1123        if schedule_ids.is_empty() {
1124            return Ok(vec![]);
1125        }
1126
1127        let schedule_ids: Vec<_> = schedule_ids.into();
1128
1129        self.with_db(move |db| {
1130            let tx = db.transaction()?;
1131
1132            let cancelled_schedules = if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1133                let mut update_stmt = tx.prepare_cached(
1134                    r#"--sql
1135                        UPDATE ora_schedule
1136                        SET
1137                            cancelled_at_unix_ns = :cancelled_at_unix_ns,
1138                            marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1139                        WHERE
1140                            id = :id
1141                            AND cancelled_at_unix_ns IS NULL
1142                            AND marked_unschedulable_at_unix_ns IS NULL
1143                        RETURNING id;
1144                    "#,
1145                )?;
1146
1147                let mut cancelled_schedules = Vec::new();
1148
1149                for id in &schedule_ids {
1150                    let mut rows = update_stmt.query_map(
1151                        named_params![
1152                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
1153                            ":id": id
1154                        ],
1155                        |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1156                    )?;
1157
1158                    if let Some(cancelled_schedule) = rows.next() {
1159                        let cancelled_schedule = cancelled_schedule?;
1160                        cancelled_schedules.push(cancelled_schedule);
1161                    }
1162                }
1163
1164                cancelled_schedules
1165            } else {
1166                {
1167                    tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1168
1169                    let mut insert_stmt =
1170                        tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1171
1172                    for id in &schedule_ids {
1173                        insert_stmt.execute(params![id])?;
1174                    }
1175                }
1176
1177                let mut update_stmt = tx.prepare_cached(
1178                    r#"--sql
1179                    UPDATE ora_schedule
1180                    SET
1181                        cancelled_at_unix_ns = :cancelled_at_unix_ns,
1182                        marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1183                    WHERE
1184                        id IN (SELECT id FROM temp.schedule_ids)
1185                        AND cancelled_at_unix_ns IS NULL
1186                        AND marked_unschedulable_at_unix_ns IS NULL
1187                    RETURNING id;
1188                    "#,
1189                )?;
1190
1191                let cancelled_schedules = update_stmt
1192                    .query_map(
1193                        named_params![
1194                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
1195                        ],
1196                        |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1197                    )?
1198                    .collect::<Result<Vec<_>, _>>()?;
1199
1200                tx.execute("DROP TABLE temp.schedule_ids", [])?;
1201
1202                cancelled_schedules
1203            };
1204
1205            tx.commit()?;
1206
1207            Ok(cancelled_schedules)
1208        })
1209        .await
1210    }
1211
1212    #[tracing::instrument(skip_all)]
1213    async fn schedules_unschedulable(
1214        &self,
1215        schedule_ids: &[Uuid],
1216        timestamp: std::time::SystemTime,
1217    ) -> eyre::Result<()> {
1218        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1219
1220        if schedule_ids.is_empty() {
1221            return Ok(());
1222        }
1223
1224        let schedule_ids: Vec<_> = schedule_ids.into();
1225
1226        self.with_db(move |db| {
1227            let tx = db.transaction()?;
1228
1229            if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1230                let mut update_stmt = tx.prepare_cached(
1231                    r#"--sql
1232                        UPDATE ora_schedule
1233                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1234                        WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
1235                    "#,
1236                )?;
1237
1238                for id in &schedule_ids {
1239                    update_stmt.execute(named_params![
1240                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
1241                        ":id": id
1242                    ])?;
1243                }
1244            } else {
1245                {
1246                    tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1247
1248                    let mut insert_stmt =
1249                        tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1250
1251                    for id in &schedule_ids {
1252                        insert_stmt.execute(params![id])?;
1253                    }
1254                }
1255
1256                let mut update_stmt = tx.prepare_cached(
1257                    r#"--sql
1258                        UPDATE ora_schedule
1259                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1260                        WHERE
1261                            id IN (SELECT id FROM temp.schedule_ids)
1262                            AND marked_unschedulable_at_unix_ns IS NULL;
1263                    "#,
1264                )?;
1265
1266                update_stmt.execute(named_params![
1267                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
1268                ])?;
1269
1270                tx.execute("DROP TABLE temp.schedule_ids", [])?;
1271            }
1272
1273            tx.commit()?;
1274
1275            Ok(())
1276        })
1277        .await
1278    }
1279
1280    #[tracing::instrument(skip_all)]
1281    async fn pending_schedules(
1282        &self,
1283        after: Option<Uuid>,
1284    ) -> eyre::Result<Vec<ora_storage::PendingSchedule>> {
1285        self.with_db(move |db| {
1286            let mut stmt = db.prepare_cached(
1287                r#"--sql
1288                    SELECT
1289                        id,
1290                        job_timing_policy,
1291                        job_creation_policy,
1292                        (
1293                            SELECT MAX(target_execution_time_unix_ns)
1294                            FROM ora_job
1295                            WHERE schedule_id = ora_schedule.id
1296                        ) AS last_target_execution_time_ns,
1297                        start_after_unix_ns,
1298                        end_before_unix_ns
1299                    FROM ora_schedule
1300                    WHERE
1301                        marked_unschedulable_at_unix_ns IS NULL
1302                        AND NOT EXISTS (
1303                            SELECT 1
1304                            FROM ora_job
1305                            WHERE schedule_id = ora_schedule.id
1306                            AND marked_unschedulable_at_unix_ns IS NULL
1307                        )
1308                        AND (? IS NULL OR id > ?)
1309                    ORDER BY id ASC
1310                    LIMIT 10000;
1311                "#,
1312            )?;
1313
1314            let mut rows = stmt.query([after, after])?;
1315
1316            let mut pending_schedules = Vec::new();
1317
1318            while let Some(row) = rows.next()? {
1319                pending_schedules.push(ora_storage::PendingSchedule {
1320                    id: row.get(0)?,
1321                    job_timing_policy: row
1322                        .get::<_, crate::models::ScheduleJobTimingPolicy>(1)?
1323                        .into(),
1324                    job_creation_policy: row
1325                        .get::<_, crate::models::ScheduleJobCreationPolicy>(2)?
1326                        .into(),
1327                    last_target_execution_time: row
1328                        .get::<_, Option<SqlSystemTime>>(3)?
1329                        .map(Into::into),
1330                    time_range: Some(ScheduleTimeRange {
1331                        start: row.get::<_, Option<SqlSystemTime>>(4)?.map(Into::into),
1332                        end: row.get::<_, Option<SqlSystemTime>>(5)?.map(Into::into),
1333                    }),
1334                });
1335            }
1336
1337            Ok(pending_schedules)
1338        })
1339        .await
1340    }
1341
1342    #[tracing::instrument(skip_all)]
1343    async fn query_schedules(
1344        &self,
1345        cursor: Option<String>,
1346        limit: usize,
1347        filters: ora_storage::ScheduleQueryFilters,
1348        order: ora_storage::ScheduleQueryOrder,
1349    ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
1350        let cursor: Option<schedule_query::Cursor> = match cursor {
1351            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1352            None => None,
1353        };
1354
1355        self.with_db(move |db| {
1356            let mut tx = db.transaction()?;
1357            let res =
1358                schedule_query::query_schedule_details(&mut tx, cursor, limit, order, filters);
1359            tx.commit()?;
1360            res
1361        })
1362        .await
1363    }
1364
1365    #[tracing::instrument(skip_all)]
1366    async fn query_schedule_ids(
1367        &self,
1368        filters: ora_storage::ScheduleQueryFilters,
1369    ) -> eyre::Result<Vec<Uuid>> {
1370        self.with_db(|db| {
1371            let mut tx = db.transaction()?;
1372            let res = schedule_query::schedule_ids(&mut tx, filters);
1373            tx.commit()?;
1374            res
1375        })
1376        .await
1377    }
1378
1379    #[tracing::instrument(skip_all)]
1380    async fn count_schedules(
1381        &self,
1382        filters: ora_storage::ScheduleQueryFilters,
1383    ) -> eyre::Result<u64> {
1384        self.with_db(|db| {
1385            let mut tx = db.transaction()?;
1386            let res = schedule_query::count_schedules(&mut tx, filters);
1387            tx.commit()?;
1388            res
1389        })
1390        .await
1391    }
1392
1393    #[tracing::instrument(skip_all)]
1394    async fn delete_schedules(
1395        &self,
1396        filters: ora_storage::ScheduleQueryFilters,
1397    ) -> eyre::Result<Vec<Uuid>> {
1398        self.with_db(|db| {
1399            let mut tx = db.transaction()?;
1400            let res = schedule_query::delete_schedules(&mut tx, filters);
1401            tx.commit()?;
1402            res
1403        })
1404        .await
1405    }
1406}