ora_storage_sqlite/
lib.rs

1//! Storage implementation for `ora` backed by sqlite.
2
3use std::{iter, path::PathBuf, sync::Arc};
4
5use async_trait::async_trait;
6use eyre::Context;
7use migrations::run_migrations;
8use models::{JobRetryPolicy, JobTimeoutPolicy, SqlSystemTime};
9use ora_storage::{NewJob, NewSchedule, ScheduleTimeRange, Storage};
10use parking_lot::Mutex;
11use rusqlite::{named_params, params, params_from_iter, Connection, OpenFlags};
12use uuid::Uuid;
13
14#[macro_use]
15mod util;
16
17mod migrations;
18mod models;
19
20mod job_query;
21mod schedule_query;
22
23mod sea_query_binder;
24
25const DEFAULT_TEMP_TABLE_THRESHOLD: usize = 1000;
26
27/// A storage implementation backed by sqlite.
28#[derive(Debug, Clone)]
29pub struct SqliteStorage {
30    /// The sqlite connection pool.
31    pool: deadpool::unmanaged::Pool<Connection>,
32    /// A mutex that ensures that scheduler operations
33    /// are always ran sequentially.
34    scheduler_mutex: Arc<Mutex<()>>,
35}
36
37/// Configuration for the sqlite storage.
38#[must_use]
39pub struct SqliteStorageConfig {
40    /// The path to the sqlite database file.
41    ///
42    /// If not provided, a temporary in-memory database will be used.
43    path: Option<PathBuf>,
44    /// Flags that are used to configure the sqlite database.
45    flags: OpenFlags,
46    /// A function that is called whenever a new connection is created.
47    /// This can be used to set PRAGMA options on the connection.
48    #[allow(clippy::type_complexity)]
49    conn_init: Option<Box<dyn Fn(&mut Connection) -> eyre::Result<()>>>,
50    /// A function that is called whenever the first connection is created.
51    #[allow(clippy::type_complexity)]
52    init: Option<Box<dyn Fn(&mut Connection) -> eyre::Result<()>>>,
53    /// The size of the connection pool.
54    ///
55    /// If not provided, a default size of 1 will be used.
56    size: usize,
57}
58
59impl SqliteStorageConfig {
60    /// Create a new `SqliteStorageConfig` with the given path.
61    pub fn new(path: impl Into<PathBuf>) -> Self {
62        Self {
63            path: Some(path.into()),
64            conn_init: None,
65            init: None,
66            flags: OpenFlags::default(),
67            size: 1,
68        }
69    }
70
71    /// Create a new `SqliteStorageConfig` with an in-memory database.
72    pub fn new_in_memory() -> Self {
73        Self {
74            path: None,
75            conn_init: None,
76            init: None,
77            flags: OpenFlags::default(),
78            size: 1,
79        }
80    }
81
82    /// Set the function that is called whenever the first connection is created.
83    ///
84    /// Note that only one function can be set at a time,
85    /// so this will overwrite any existing function.
86    ///
87    /// Useful for setting PRAGMA options that affect the entire database
88    /// and all connections.
89    pub fn with_init<F>(mut self, f: F) -> Self
90    where
91        F: Fn(&mut Connection) -> eyre::Result<()> + 'static,
92    {
93        self.init = Some(Box::new(f));
94        self
95    }
96
97    /// Set the function that is called whenever a new connection is created.
98    ///
99    /// Note that only one function can be set at a time,
100    /// so this will overwrite any existing function.
101    pub fn with_connection_init<F>(mut self, f: F) -> Self
102    where
103        F: Fn(&mut Connection) -> eyre::Result<()> + 'static,
104    {
105        self.conn_init = Some(Box::new(f));
106        self
107    }
108
109    /// Set the flags that are used to configure the sqlite database.
110    pub fn with_flags(mut self, flags: OpenFlags) -> Self {
111        self.flags = flags;
112        self
113    }
114
115    /// Set the amount of connections to use to access the database.
116    ///
117    /// The connections are created eagerly when the storage is created.
118    ///
119    /// # Panics
120    ///
121    /// Panics if the size is 0.
122    pub fn with_connection_count(mut self, size: usize) -> Self {
123        assert!(size > 0, "pool size must be greater than 0");
124        self.size = size;
125        self
126    }
127
128    /// Create a new connection.
129    fn new_connection(&self) -> rusqlite::Result<Connection> {
130        tracing::debug!("creating new sqlite connection");
131        if let Some(path) = &self.path {
132            Connection::open_with_flags(path, self.flags)
133        } else {
134            Connection::open_in_memory_with_flags(self.flags)
135        }
136    }
137}
138
139impl std::fmt::Debug for SqliteStorageConfig {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("SqliteStorageConfig")
142            .field("path", &self.path)
143            .finish_non_exhaustive()
144    }
145}
146
147impl SqliteStorage {
148    /// Create a new sqlite storage instance with the given connection.
149    pub fn new(mut config: SqliteStorageConfig) -> eyre::Result<Self> {
150        if config.path.is_none() && config.size > 1 {
151            tracing::warn!("sqlite in-memory storage with multiple connections is not supported");
152            config.size = 1;
153        }
154
155        let pool = deadpool::unmanaged::Pool::new(config.size);
156
157        for i in 0..config.size {
158            let mut conn = config.new_connection()?;
159
160            if i == 0 {
161                if let Some(init) = &config.init {
162                    init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
163                }
164
165                if let Some(conn_init) = &config.conn_init {
166                    conn_init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
167                }
168
169                run_migrations(&mut conn).wrap_err("failed to run migrations")?;
170            } else if let Some(conn_init) = &config.conn_init {
171                conn_init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
172            }
173
174            pool.try_add(conn).map_err(|(_, e)| e)?;
175        }
176
177        Ok(Self {
178            pool,
179            scheduler_mutex: Arc::new(Mutex::new(())),
180        })
181    }
182
183    /// Run `optimize` on the database.
184    ///
185    /// This should be called periodically (e.g. hourly).
186    pub async fn optimize(&self) -> eyre::Result<()> {
187        self.with_db(|db| {
188            db.execute_batch(
189                r#"--sql
190                    PRAGMA optimize;
191                "#,
192            )?;
193            Ok(())
194        })
195        .await
196    }
197
198    /// Run `vacuum` on the database.
199    ///
200    /// This should be called periodically depending
201    /// on the amount of data generated and deleted (e.g. daily).
202    pub async fn vacuum(&self) -> eyre::Result<()> {
203        self.with_db(|db| {
204            db.execute_batch(
205                r#"--sql
206                    VACUUM;
207                "#,
208            )?;
209            Ok(())
210        })
211        .await
212    }
213
214    /// Run a function on a separate thread
215    /// with a mutable reference to an sqlite connection.
216    async fn with_db_concurrent<F, O>(&self, f: F) -> eyre::Result<O>
217    where
218        F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
219        O: Send + 'static,
220    {
221        let span = tracing::Span::current();
222
223        tracing::trace!("acquiring database connection");
224        let mut conn = self.pool.get().await?;
225        tracing::trace!("acquired database connection");
226
227        tokio::task::spawn_blocking(move || {
228            let _guard = span.enter();
229            f(&mut conn)
230        })
231        .await
232        .unwrap()
233    }
234
235    /// Run a function on a separate thread
236    /// with a mutable reference to an sqlite connection.
237    ///
238    /// This function is used to run scheduler operations
239    /// that need to be run sequentially.
240    async fn with_db<F, O>(&self, f: F) -> eyre::Result<O>
241    where
242        F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
243        O: Send + 'static,
244    {
245        let scheduler_mutex = self.scheduler_mutex.clone();
246        self.with_db_concurrent(move |conn| {
247            let _guard = scheduler_mutex.lock();
248            f(conn)
249        })
250        .await
251    }
252}
253
254#[async_trait]
255impl Storage for SqliteStorage {
256    #[tracing::instrument(skip_all)]
257    async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
258        if job_types.is_empty() {
259            return Ok(());
260        }
261
262        self.with_db(|db| {
263            let tx = db.transaction()?;
264
265            {
266                let mut insert_stmt = tx.prepare(
267                    r#"--sql
268                        INSERT INTO ora_job_type (
269                            id,
270                            name,
271                            description,
272                            input_schema_json,
273                            output_schema_json
274                        )
275                        VALUES (
276                            :id,
277                            :name,
278                            :description,
279                            :input_schema_json,
280                            :output_schema_json
281                        )
282                        ON CONFLICT(id) DO UPDATE SET
283                            name = excluded.name,
284                            description = excluded.description,
285                            input_schema_json = excluded.input_schema_json,
286                            output_schema_json = excluded.output_schema_json;
287                    "#,
288                )?;
289
290                for job_type in job_types {
291                    insert_stmt.execute(params![
292                        job_type.id,
293                        job_type.name,
294                        job_type.description,
295                        job_type.input_schema_json,
296                        job_type.output_schema_json
297                    ])?;
298                }
299            }
300
301            tx.commit()?;
302
303            Ok(())
304        })
305        .await
306    }
307
308    #[tracing::instrument(skip_all)]
309    async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
310        if jobs.is_empty() {
311            return Ok(());
312        }
313
314        self.with_db(|db| {
315            let tx = db.transaction()?;
316            add_jobs(&tx, jobs)?;
317            tx.commit()?;
318
319            Ok(())
320        })
321        .await
322    }
323
324    async fn job_added_conditionally(
325        &self,
326        job: ora_storage::NewJob,
327        filters: ora_storage::JobQueryFilters,
328    ) -> eyre::Result<ora_storage::ConditionalJobResult> {
329        self.with_db(move |db| {
330            let mut tx = db.transaction()?;
331            let res = job_query::job_ids(&mut tx, filters)?;
332
333            if let Some(job_id) = res.first() {
334                tx.commit()?;
335                return Ok(ora_storage::ConditionalJobResult::AlreadyExists { job_id: *job_id });
336            }
337
338            add_jobs(&tx, iter::once(job))?;
339            tx.commit()?;
340
341            Ok(ora_storage::ConditionalJobResult::Added)
342        })
343        .await
344    }
345
346    #[tracing::instrument(skip_all)]
347    async fn jobs_cancelled(
348        &self,
349        job_ids: &[Uuid],
350        timestamp: std::time::SystemTime,
351    ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
352        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
353
354        if job_ids.is_empty() {
355            return Ok(vec![]);
356        }
357
358        let job_ids: Vec<_> = job_ids.into();
359
360        self.with_db(move |db| {
361            let tx = db.transaction()?;
362
363            let cancelled_jobs = if job_ids.len() < TEMP_TABLE_THRESHOLD {
364                let mut update_stmt = tx.prepare_cached(
365                    r#"--sql
366                        UPDATE ora_job
367                        SET
368                            cancelled_at_unix_ns = :cancelled_at_unix_ns,
369                            marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
370                        WHERE
371                            id = :id
372                            AND cancelled_at_unix_ns IS NULL
373                            AND marked_unschedulable_at_unix_ns IS NULL
374                        RETURNING id, (
375                            SELECT id
376                            FROM ora_execution
377                            WHERE job_id = ora_job.id
378                            AND active
379                            LIMIT 1
380                        ) AS execution_id;
381                    "#,
382                )?;
383
384                let mut cancelled_jobs = Vec::new();
385
386                for id in &job_ids {
387                    let mut rows = update_stmt.query_map(
388                        named_params![
389                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
390                            ":id": id
391                        ],
392                        |row| {
393                            Ok(ora_storage::CancelledJob {
394                                id: row.get(0)?,
395                                active_execution: row.get(1)?,
396                            })
397                        },
398                    )?;
399
400                    if let Some(cancelled_job) = rows.next() {
401                        let cancelled_job = cancelled_job?;
402                        cancelled_jobs.push(cancelled_job);
403                    }
404                }
405
406                cancelled_jobs
407            } else {
408                {
409                    tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
410
411                    let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
412
413                    for id in &job_ids {
414                        insert_stmt.execute(params![id])?;
415                    }
416                }
417
418                let mut update_stmt = tx.prepare_cached(
419                    r#"--sql
420                    UPDATE ora_job
421                    SET
422                        cancelled_at_unix_ns = :cancelled_at_unix_ns,
423                        marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
424                    WHERE
425                        id IN (SELECT id FROM temp.job_ids)
426                        AND cancelled_at_unix_ns IS NULL
427                        AND marked_unschedulable_at_unix_ns IS NULL
428                    RETURNING id, (
429                        SELECT id
430                        FROM ora_execution
431                        WHERE job_id = ora_job.id
432                        AND active
433                        LIMIT 1
434                    ) AS execution_id;
435                    "#,
436                )?;
437
438                let cancelled_jobs = update_stmt
439                    .query_map(
440                        named_params![
441                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
442                        ],
443                        |row| {
444                            Ok(ora_storage::CancelledJob {
445                                id: row.get(0)?,
446                                active_execution: row.get(1)?,
447                            })
448                        },
449                    )?
450                    .collect::<Result<Vec<ora_storage::CancelledJob>, _>>()?;
451
452                tx.execute("DROP TABLE temp.job_ids", [])?;
453
454                cancelled_jobs
455            };
456
457            tx.commit()?;
458
459            Ok(cancelled_jobs)
460        })
461        .await
462    }
463
464    #[tracing::instrument(skip_all)]
465    async fn executions_added(
466        &self,
467        executions: Vec<ora_storage::NewExecution>,
468        timestamp: std::time::SystemTime,
469    ) -> eyre::Result<()> {
470        if executions.is_empty() {
471            return Ok(());
472        }
473
474        self.with_db(move |db| {
475            let tx = db.transaction()?;
476
477            {
478                let mut insert_stmt = tx.prepare_cached(
479                    r#"--sql
480                        INSERT INTO ora_execution (
481                            id,
482                            job_id,
483                            created_at_unix_ns
484                        )
485                        VALUES (
486                            :id,
487                            :job_id,
488                            :created_at_unix_ns
489                        );
490                    "#,
491                )?;
492
493                for execution in executions {
494                    insert_stmt.execute(params![
495                        execution.id,
496                        execution.job_id,
497                        SqlSystemTime(timestamp)
498                    ])?;
499                }
500            }
501
502            tx.commit()?;
503
504            Ok(())
505        })
506        .await
507    }
508
509    #[tracing::instrument(skip_all)]
510    async fn executions_ready(
511        &self,
512        execution_ids: &[Uuid],
513        timestamp: std::time::SystemTime,
514    ) -> eyre::Result<()> {
515        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
516
517        if execution_ids.is_empty() {
518            return Ok(());
519        }
520
521        let execution_ids: Vec<_> = execution_ids.into();
522
523        self.with_db(move |db| {
524            let tx = db.transaction()?;
525
526            if execution_ids.len() < TEMP_TABLE_THRESHOLD {
527                let mut update_stmt = tx.prepare_cached(
528                    r#"--sql
529                        UPDATE ora_execution
530                        SET ready_at_unix_ns = :ready_at_unix_ns
531                        WHERE id = :id;
532                    "#,
533                )?;
534
535                for id in &execution_ids {
536                    update_stmt.execute(params![SqlSystemTime(timestamp), id])?;
537                }
538            } else {
539                {
540                    tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
541
542                    let mut insert_stmt =
543                        tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
544
545                    for id in &execution_ids {
546                        insert_stmt.execute(params![id])?;
547                    }
548                }
549
550                let mut update_stmt = tx.prepare_cached(
551                    r#"--sql
552                        UPDATE ora_execution
553                        SET ready_at_unix_ns = :ready_at_unix_ns
554                        WHERE
555                            id IN (SELECT id FROM temp.execution_ids)
556                            AND ready_at_unix_ns IS NULL;
557                    "#,
558                )?;
559
560                update_stmt.execute(params![SqlSystemTime(timestamp)])?;
561
562                tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
563            }
564
565            tx.commit()?;
566
567            Ok(())
568        })
569        .await
570    }
571
572    #[tracing::instrument(skip_all)]
573    async fn execution_assigned(
574        &self,
575        execution_id: Uuid,
576        executor_id: Uuid,
577        timestamp: std::time::SystemTime,
578    ) -> eyre::Result<()> {
579        self.with_db(move |db| {
580            let tx = db.transaction()?;
581
582            {
583                let mut update_stmt = tx.prepare_cached(
584                    r#"--sql
585                        UPDATE ora_execution
586                        SET executor_id = :executor_id,
587                            assigned_at_unix_ns = :assigned_at_unix_ns
588                        WHERE id = :id;
589                    "#,
590                )?;
591
592                update_stmt.execute(named_params![
593                    ":executor_id": executor_id,
594                    ":assigned_at_unix_ns": SqlSystemTime(timestamp),
595                    ":id": execution_id
596                ])?;
597            }
598
599            tx.commit()?;
600
601            Ok(())
602        })
603        .await
604    }
605
606    #[tracing::instrument(skip_all)]
607    async fn execution_started(
608        &self,
609        execution_id: Uuid,
610        timestamp: std::time::SystemTime,
611    ) -> eyre::Result<()> {
612        self.with_db(move |db| {
613            let tx = db.transaction()?;
614
615            {
616                let mut update_stmt = tx.prepare_cached(
617                    r#"--sql
618                        UPDATE ora_execution
619                        SET started_at_unix_ns = :started_at_unix_ns
620                        WHERE id = :id;
621                    "#,
622                )?;
623
624                update_stmt.execute(named_params! {
625                    ":started_at_unix_ns": SqlSystemTime(timestamp),
626                    ":id": execution_id
627                })?;
628            }
629
630            tx.commit()?;
631
632            Ok(())
633        })
634        .await
635    }
636
637    #[tracing::instrument(skip_all)]
638    async fn execution_succeeded(
639        &self,
640        execution_id: Uuid,
641        timestamp: std::time::SystemTime,
642        output_payload_json: String,
643    ) -> eyre::Result<()> {
644        self.with_db(move |db| {
645            let tx = db.transaction()?;
646
647            {
648                let mut update_stmt = tx.prepare_cached(
649                    r#"--sql
650                        UPDATE ora_execution
651                        SET succeeded_at_unix_ns = :succeeded_at_unix_ns,
652                            output_payload_json = :output_payload_json
653                        WHERE id = :id;
654                        "#,
655                )?;
656
657                update_stmt.execute(named_params![
658                    ":succeeded_at_unix_ns": SqlSystemTime(timestamp),
659                    ":output_payload_json": output_payload_json,
660                    ":id": execution_id
661                ])?;
662            }
663
664            {
665                let mut update_stmt = tx.prepare_cached(
666                    r#"--sql
667                    UPDATE ora_job
668                    SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
669                    WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
670                    "#,
671                )?;
672
673                update_stmt.execute(named_params! {
674                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
675                    ":id": execution_id
676                })?;
677            }
678            tx.commit()?;
679
680            Ok(())
681        })
682        .await
683    }
684
685    #[tracing::instrument(skip_all)]
686    async fn executions_failed(
687        &self,
688        execution_ids: &[Uuid],
689        timestamp: std::time::SystemTime,
690        reason: String,
691        mark_job_unschedulable: bool,
692    ) -> eyre::Result<()> {
693        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
694
695        if execution_ids.is_empty() {
696            return Ok(());
697        }
698
699        let execution_ids: Vec<_> = execution_ids.into();
700
701        self.with_db(move |db| {
702            let tx = db.transaction()?;
703
704            if execution_ids.len() < TEMP_TABLE_THRESHOLD {
705                let mut update_stmt = tx.prepare_cached(
706                    r#"--sql
707                        UPDATE ora_execution
708                        SET failed_at_unix_ns = :failed_at_unix_ns,
709                            failure_reason = :failure_reason
710                        WHERE id = :id;
711                    "#,
712                )?;
713
714                for id in &execution_ids {
715                    update_stmt.execute(named_params![
716                        ":failed_at_unix_ns": SqlSystemTime(timestamp),
717                        ":failure_reason": reason,
718                        ":id": id
719                    ])?;
720                }
721
722                if mark_job_unschedulable {
723                    let mut update_stmt = tx.prepare_cached(
724                        r#"--sql
725                            UPDATE ora_job
726                            SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
727                            WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
728                        "#,
729                    )?;
730
731                    for id in &execution_ids {
732                        update_stmt.execute(named_params! {
733                            ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
734                            ":id": id
735                        })?;
736                    }
737                }
738            } else {
739                {
740                    tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
741
742                    let mut insert_stmt =
743                        tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
744
745                    for id in &execution_ids {
746                        insert_stmt.execute(params![id])?;
747                    }
748                }
749
750                let mut update_stmt = tx.prepare_cached(
751                    r#"--sql
752                        UPDATE ora_execution
753                        SET failed_at_unix_ns = :failed_at_unix_ns,
754                            failure_reason = :failure_reason
755                        WHERE
756                            id IN (SELECT id FROM temp.execution_ids)
757                            AND failed_at_unix_ns IS NULL;
758                    "#,
759                )?;
760
761                update_stmt.execute(named_params![
762                    ":failed_at_unix_ns": SqlSystemTime(timestamp),
763                    ":failure_reason": reason
764                ])?;
765
766                if mark_job_unschedulable {
767                    let mut update_stmt = tx.prepare_cached(
768                        r#"--sql
769                            UPDATE ora_job
770                            SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
771                            WHERE id IN (SELECT job_id FROM ora_execution WHERE id IN (SELECT id FROM temp.execution_ids));
772                        "#,
773                    )?;
774
775                    update_stmt.execute(named_params! {
776                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
777                    })?;
778                }
779            }
780
781
782
783            tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
784
785            tx.commit()?;
786
787            Ok(())
788        })
789        .await
790    }
791
792    #[tracing::instrument(skip_all)]
793    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
794        let executor_ids: Vec<_> = executor_ids.into();
795
796        self.with_db(move |db| {
797            if executor_ids.is_empty() {
798                let mut stmt = db.prepare(
799                    r#"--sql
800                        SELECT id
801                        FROM ora_execution
802                        WHERE
803                            executor_id IS NOT NULL
804                            AND (
805                                succeeded_at_unix_ns IS NULL
806                                AND failed_at_unix_ns IS NULL
807                            );
808                    "#,
809                )?;
810
811                let ids = stmt
812                    .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
813                    .collect::<Result<Vec<Uuid>, _>>()?;
814
815                Ok(ids)
816            } else {
817                let id_params_sql = "?,".repeat(executor_ids.len());
818                let id_params_sql = &id_params_sql[..id_params_sql.len() - 1];
819
820                let mut stmt = db.prepare(&format!(
821                    r#"--sql
822                        SELECT id
823                        FROM ora_execution
824                        WHERE
825                            executor_id NOT IN ({id_params_sql})
826                            AND (
827                                succeeded_at_unix_ns IS NULL
828                                AND failed_at_unix_ns IS NULL
829                            );
830                    "#
831                ))?;
832
833                let ids = stmt
834                    .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
835                    .collect::<Result<Vec<Uuid>, _>>()?;
836
837                Ok(ids)
838            }
839        })
840        .await
841    }
842
843    #[tracing::instrument(skip_all)]
844    async fn jobs_unschedulable(
845        &self,
846        job_ids: &[Uuid],
847        timestamp: std::time::SystemTime,
848    ) -> eyre::Result<()> {
849        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
850
851        if job_ids.is_empty() {
852            return Ok(());
853        }
854
855        let job_ids: Vec<_> = job_ids.into();
856
857        self.with_db(move |db| {
858            let tx = db.transaction()?;
859
860            if job_ids.len() < TEMP_TABLE_THRESHOLD {
861                let mut update_stmt = tx.prepare_cached(
862                    r#"--sql
863                        UPDATE ora_job
864                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
865                        WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
866                    "#,
867                )?;
868
869                for id in &job_ids {
870                    update_stmt.execute(named_params![
871                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
872                        ":id": id
873                    ])?;
874                }
875            } else {
876                {
877                    tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
878
879                    let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
880
881                    for id in &job_ids {
882                        insert_stmt.execute(params![id])?;
883                    }
884                }
885
886                let mut update_stmt = tx.prepare_cached(
887                    r#"--sql
888                        UPDATE ora_job
889                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
890                        WHERE
891                            id IN (SELECT id FROM temp.job_ids)
892                            AND marked_unschedulable_at_unix_ns IS NULL;
893                    "#,
894                )?;
895
896                update_stmt.execute(named_params![
897                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
898                ])?;
899
900                tx.execute("DROP TABLE temp.job_ids", [])?;
901            }
902
903            tx.commit()?;
904
905            Ok(())
906        })
907        .await
908    }
909
910    #[tracing::instrument(skip_all)]
911    async fn pending_executions(
912        &self,
913        after: Option<Uuid>,
914    ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
915        self.with_db(move |db| {
916            let mut stmt = db.prepare_cached(
917                r#"--sql
918                    SELECT
919                        e.id,
920                        j.target_execution_time_unix_ns
921                    FROM ora_execution e
922                    JOIN
923                        ora_job j
924                    ON
925                        e.job_id = j.id
926                    WHERE
927                        e.ready_at_unix_ns IS NULL
928                        AND (? IS NULL OR e.id > ?)
929                    LIMIT 10000;
930                "#,
931            )?;
932
933            let pending_executions = stmt
934                .query_map(params![after, after], |row| {
935                    Ok(ora_storage::PendingExecution {
936                        id: row.get(0)?,
937                        target_execution_time: row.get::<_, SqlSystemTime>(1)?.into(),
938                    })
939                })?
940                .collect::<Result<Vec<_>, _>>()?;
941
942            Ok(pending_executions)
943        })
944        .await
945    }
946
947    #[tracing::instrument(skip_all)]
948    async fn ready_executions(
949        &self,
950        after: Option<Uuid>,
951    ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
952        self.with_db(move |db| {
953            let mut stmt = db.prepare_cached(
954                r#"--sql
955                    SELECT
956                        e.id,
957                        e.job_id,
958                        j.input_payload_json,
959                        es.execution_count AS attempt_number,
960                        j.job_type_id,
961                        j.target_execution_time_unix_ns,
962                        j.timeout_policy
963                    FROM ora_execution e
964                    JOIN ora_job j ON
965                        j.id = e.job_id
966                    JOIN ora_job_execution_state es ON
967                        j.id = es.job_id
968                    WHERE
969                        ready_at_unix_ns IS NOT NULL
970                        AND assigned_at_unix_ns IS NULL
971                        AND started_at_unix_ns IS NULL
972                        AND failed_at_unix_ns IS NULL
973                        AND succeeded_at_unix_ns IS NULL
974                        AND (? IS NULL OR e.id > ?)
975                    ORDER BY e.id ASC
976                    LIMIT 10000;
977                "#,
978            )?;
979
980            let mut rows = stmt.query([after, after])?;
981
982            let mut ready_executions = Vec::new();
983
984            while let Some(row) = rows.next()? {
985                ready_executions.push(ora_storage::ReadyExecution {
986                    id: row.get(0)?,
987                    job_id: row.get(1)?,
988                    input_payload_json: row.get(2)?,
989                    attempt_number: row.get(3)?,
990                    job_type_id: row.get(4)?,
991                    target_execution_time: row.get::<_, SqlSystemTime>(5)?.into(),
992                    timeout_policy: row.get::<_, JobTimeoutPolicy>(6)?.into(),
993                });
994            }
995
996            Ok(ready_executions)
997        })
998        .await
999    }
1000
1001    #[tracing::instrument(skip_all)]
1002    async fn pending_jobs(
1003        &self,
1004        after: Option<Uuid>,
1005    ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
1006        self.with_db(move |db| {
1007            let mut stmt = db.prepare_cached(
1008                r#"--sql
1009                    SELECT
1010                        ora_job.id,
1011                        target_execution_time_unix_ns,
1012                        es.execution_count,
1013                        retry_policy,
1014                        timeout_policy
1015                    FROM ora_job
1016                    JOIN ora_job_execution_state es ON
1017                        ora_job.id = es.job_id
1018                    WHERE
1019                        marked_unschedulable_at_unix_ns IS NULL
1020                        AND es.active_execution_id IS NULL
1021                        AND (? IS NULL OR id > ?)
1022                    ORDER BY ora_job.id ASC
1023                    LIMIT 10000;
1024                "#,
1025            )?;
1026
1027            let mut rows = stmt.query([after, after])?;
1028
1029            let mut pending_jobs = Vec::new();
1030
1031            while let Some(row) = rows.next()? {
1032                pending_jobs.push(ora_storage::PendingJob {
1033                    id: row.get(0)?,
1034                    target_execution_time: row.get::<_, SqlSystemTime>(1)?.0,
1035                    execution_count: row.get(2)?,
1036                    retry_policy: row.get::<_, JobRetryPolicy>(3)?.into(),
1037                    timeout_policy: row.get::<_, JobTimeoutPolicy>(4)?.into(),
1038                });
1039            }
1040
1041            Ok(pending_jobs)
1042        })
1043        .await
1044    }
1045
1046    #[tracing::instrument(skip_all)]
1047    async fn query_jobs(
1048        &self,
1049        cursor: Option<String>,
1050        limit: usize,
1051        order: ora_storage::JobQueryOrder,
1052        filters: ora_storage::JobQueryFilters,
1053    ) -> eyre::Result<ora_storage::JobQueryResult> {
1054        let cursor: Option<job_query::Cursor> = match cursor {
1055            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1056            None => None,
1057        };
1058
1059        self.with_db_concurrent(move |db| {
1060            let mut tx = db.transaction()?;
1061            let res = job_query::query_job_details(&mut tx, cursor, limit, order, filters);
1062            tx.commit()?;
1063            res
1064        })
1065        .await
1066    }
1067
1068    #[tracing::instrument(skip_all)]
1069    async fn query_job_ids(
1070        &self,
1071        filters: ora_storage::JobQueryFilters,
1072    ) -> eyre::Result<Vec<Uuid>> {
1073        self.with_db_concurrent(|db| {
1074            let mut tx = db.transaction()?;
1075            let res = job_query::job_ids(&mut tx, filters);
1076            tx.commit()?;
1077            res
1078        })
1079        .await
1080    }
1081
1082    #[tracing::instrument(skip_all)]
1083    async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
1084        self.with_db_concurrent(|db| {
1085            let mut tx = db.transaction()?;
1086            let res = job_query::count_jobs(&mut tx, filters);
1087            tx.commit()?;
1088            res
1089        })
1090        .await
1091    }
1092
1093    #[tracing::instrument(skip_all)]
1094    async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
1095        self.with_db_concurrent(|db| {
1096            let mut stmt = db.prepare_cached(
1097                r#"--sql
1098                    SELECT
1099                        id,
1100                        name,
1101                        description,
1102                        input_schema_json,
1103                        output_schema_json
1104                    FROM ora_job_type;
1105                "#,
1106            )?;
1107
1108            let job_types = stmt
1109                .query_map([], |row| {
1110                    Ok(ora_storage::JobType {
1111                        id: row.get(0)?,
1112                        name: row.get(1)?,
1113                        description: row.get(2)?,
1114                        input_schema_json: row.get(3)?,
1115                        output_schema_json: row.get(4)?,
1116                    })
1117                })?
1118                .collect::<Result<Vec<_>, _>>()?;
1119
1120            Ok(job_types)
1121        })
1122        .await
1123    }
1124
1125    #[tracing::instrument(skip_all)]
1126    async fn delete_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
1127        self.with_db(|db| {
1128            let mut tx = db.transaction()?;
1129            let res = job_query::delete_jobs(&mut tx, filters);
1130            tx.commit()?;
1131            res
1132        })
1133        .await
1134    }
1135
1136    #[tracing::instrument(skip_all)]
1137    async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
1138        if schedules.is_empty() {
1139            return Ok(());
1140        }
1141
1142        self.with_db(|db| {
1143            let tx = db.transaction()?;
1144            add_schedules(&tx, schedules)?;
1145            tx.commit()?;
1146
1147            Ok(())
1148        })
1149        .await
1150    }
1151
1152    async fn schedule_added_conditionally(
1153        &self,
1154        schedule: ora_storage::NewSchedule,
1155        filter: ora_storage::ScheduleQueryFilters,
1156    ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
1157        self.with_db(move |db| {
1158            let mut tx = db.transaction()?;
1159            let res = schedule_query::schedule_ids(&mut tx, filter)?;
1160
1161            if let Some(schedule_id) = res.first() {
1162                tx.commit()?;
1163                return Ok(ora_storage::ConditionalScheduleResult::AlreadyExists {
1164                    schedule_id: *schedule_id,
1165                });
1166            }
1167
1168            add_schedules(&tx, iter::once(schedule))?;
1169            tx.commit()?;
1170
1171            Ok(ora_storage::ConditionalScheduleResult::Added)
1172        })
1173        .await
1174    }
1175
1176    #[tracing::instrument(skip_all)]
1177    async fn schedules_cancelled(
1178        &self,
1179        schedule_ids: &[Uuid],
1180        timestamp: std::time::SystemTime,
1181    ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
1182        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1183
1184        if schedule_ids.is_empty() {
1185            return Ok(vec![]);
1186        }
1187
1188        let schedule_ids: Vec<_> = schedule_ids.into();
1189
1190        self.with_db(move |db| {
1191            let tx = db.transaction()?;
1192
1193            let cancelled_schedules = if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1194                let mut update_stmt = tx.prepare_cached(
1195                    r#"--sql
1196                        UPDATE ora_schedule
1197                        SET
1198                            cancelled_at_unix_ns = :cancelled_at_unix_ns,
1199                            marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1200                        WHERE
1201                            id = :id
1202                            AND cancelled_at_unix_ns IS NULL
1203                            AND marked_unschedulable_at_unix_ns IS NULL
1204                        RETURNING id;
1205                    "#,
1206                )?;
1207
1208                let mut cancelled_schedules = Vec::new();
1209
1210                for id in &schedule_ids {
1211                    let mut rows = update_stmt.query_map(
1212                        named_params![
1213                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
1214                            ":id": id
1215                        ],
1216                        |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1217                    )?;
1218
1219                    if let Some(cancelled_schedule) = rows.next() {
1220                        let cancelled_schedule = cancelled_schedule?;
1221                        cancelled_schedules.push(cancelled_schedule);
1222                    }
1223                }
1224
1225                cancelled_schedules
1226            } else {
1227                {
1228                    tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1229
1230                    let mut insert_stmt =
1231                        tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1232
1233                    for id in &schedule_ids {
1234                        insert_stmt.execute(params![id])?;
1235                    }
1236                }
1237
1238                let mut update_stmt = tx.prepare_cached(
1239                    r#"--sql
1240                    UPDATE ora_schedule
1241                    SET
1242                        cancelled_at_unix_ns = :cancelled_at_unix_ns,
1243                        marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1244                    WHERE
1245                        id IN (SELECT id FROM temp.schedule_ids)
1246                        AND cancelled_at_unix_ns IS NULL
1247                        AND marked_unschedulable_at_unix_ns IS NULL
1248                    RETURNING id;
1249                    "#,
1250                )?;
1251
1252                let cancelled_schedules = update_stmt
1253                    .query_map(
1254                        named_params![
1255                            ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
1256                        ],
1257                        |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1258                    )?
1259                    .collect::<Result<Vec<_>, _>>()?;
1260
1261                tx.execute("DROP TABLE temp.schedule_ids", [])?;
1262
1263                cancelled_schedules
1264            };
1265
1266            tx.commit()?;
1267
1268            Ok(cancelled_schedules)
1269        })
1270        .await
1271    }
1272
1273    #[tracing::instrument(skip_all)]
1274    async fn schedules_unschedulable(
1275        &self,
1276        schedule_ids: &[Uuid],
1277        timestamp: std::time::SystemTime,
1278    ) -> eyre::Result<()> {
1279        const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1280
1281        if schedule_ids.is_empty() {
1282            return Ok(());
1283        }
1284
1285        let schedule_ids: Vec<_> = schedule_ids.into();
1286
1287        self.with_db(move |db| {
1288            let tx = db.transaction()?;
1289
1290            if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1291                let mut update_stmt = tx.prepare_cached(
1292                    r#"--sql
1293                        UPDATE ora_schedule
1294                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1295                        WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
1296                    "#,
1297                )?;
1298
1299                for id in &schedule_ids {
1300                    update_stmt.execute(named_params![
1301                        ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
1302                        ":id": id
1303                    ])?;
1304                }
1305            } else {
1306                {
1307                    tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1308
1309                    let mut insert_stmt =
1310                        tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1311
1312                    for id in &schedule_ids {
1313                        insert_stmt.execute(params![id])?;
1314                    }
1315                }
1316
1317                let mut update_stmt = tx.prepare_cached(
1318                    r#"--sql
1319                        UPDATE ora_schedule
1320                        SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1321                        WHERE
1322                            id IN (SELECT id FROM temp.schedule_ids)
1323                            AND marked_unschedulable_at_unix_ns IS NULL;
1324                    "#,
1325                )?;
1326
1327                update_stmt.execute(named_params![
1328                    ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
1329                ])?;
1330
1331                tx.execute("DROP TABLE temp.schedule_ids", [])?;
1332            }
1333
1334            tx.commit()?;
1335
1336            Ok(())
1337        })
1338        .await
1339    }
1340
1341    #[tracing::instrument(skip_all)]
1342    async fn pending_schedules(
1343        &self,
1344        after: Option<Uuid>,
1345    ) -> eyre::Result<Vec<ora_storage::PendingSchedule>> {
1346        self.with_db(move |db| {
1347            let mut stmt = db.prepare_cached(
1348                r#"--sql
1349                    SELECT
1350                        id,
1351                        job_timing_policy,
1352                        job_creation_policy,
1353                        last_target_execution_time_unix_ns,
1354                        start_after_unix_ns,
1355                        end_before_unix_ns
1356                    FROM ora_schedule_job_state
1357                    JOIN ora_schedule ON
1358                        ora_schedule_job_state.schedule_id = ora_schedule.id
1359                    WHERE
1360                        marked_unschedulable_at_unix_ns IS NULL
1361                        AND active_job_id IS NULL
1362                        AND (? IS NULL OR id > ?)
1363                    ORDER BY id ASC
1364                    LIMIT 10000;
1365                "#,
1366            )?;
1367
1368            let mut rows = stmt.query([after, after])?;
1369
1370            let mut pending_schedules = Vec::new();
1371
1372            while let Some(row) = rows.next()? {
1373                pending_schedules.push(ora_storage::PendingSchedule {
1374                    id: row.get(0)?,
1375                    job_timing_policy: row
1376                        .get::<_, crate::models::ScheduleJobTimingPolicy>(1)?
1377                        .into(),
1378                    job_creation_policy: row
1379                        .get::<_, crate::models::ScheduleJobCreationPolicy>(2)?
1380                        .into(),
1381                    last_target_execution_time: row
1382                        .get::<_, Option<SqlSystemTime>>(3)?
1383                        .map(Into::into),
1384                    time_range: Some(ScheduleTimeRange {
1385                        start: row.get::<_, Option<SqlSystemTime>>(4)?.map(Into::into),
1386                        end: row.get::<_, Option<SqlSystemTime>>(5)?.map(Into::into),
1387                    }),
1388                });
1389            }
1390
1391            Ok(pending_schedules)
1392        })
1393        .await
1394    }
1395
1396    #[tracing::instrument(skip_all)]
1397    async fn query_schedules(
1398        &self,
1399        cursor: Option<String>,
1400        limit: usize,
1401        filters: ora_storage::ScheduleQueryFilters,
1402        order: ora_storage::ScheduleQueryOrder,
1403    ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
1404        let cursor: Option<schedule_query::Cursor> = match cursor {
1405            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1406            None => None,
1407        };
1408
1409        self.with_db_concurrent(move |db| {
1410            let mut tx = db.transaction()?;
1411            let res =
1412                schedule_query::query_schedule_details(&mut tx, cursor, limit, order, filters);
1413            tx.commit()?;
1414            res
1415        })
1416        .await
1417    }
1418
1419    #[tracing::instrument(skip_all)]
1420    async fn query_schedule_ids(
1421        &self,
1422        filters: ora_storage::ScheduleQueryFilters,
1423    ) -> eyre::Result<Vec<Uuid>> {
1424        self.with_db_concurrent(|db| {
1425            let mut tx = db.transaction()?;
1426            let res = schedule_query::schedule_ids(&mut tx, filters);
1427            tx.commit()?;
1428            res
1429        })
1430        .await
1431    }
1432
1433    #[tracing::instrument(skip_all)]
1434    async fn count_schedules(
1435        &self,
1436        filters: ora_storage::ScheduleQueryFilters,
1437    ) -> eyre::Result<u64> {
1438        self.with_db_concurrent(|db| {
1439            let mut tx = db.transaction()?;
1440            let res = schedule_query::count_schedules(&mut tx, filters);
1441            tx.commit()?;
1442            res
1443        })
1444        .await
1445    }
1446
1447    #[tracing::instrument(skip_all)]
1448    async fn delete_schedules(
1449        &self,
1450        filters: ora_storage::ScheduleQueryFilters,
1451    ) -> eyre::Result<Vec<Uuid>> {
1452        self.with_db(|db| {
1453            let mut tx = db.transaction()?;
1454            let res = schedule_query::delete_schedules(&mut tx, filters);
1455            tx.commit()?;
1456            res
1457        })
1458        .await
1459    }
1460}
1461
1462fn add_jobs(
1463    tx: &rusqlite::Transaction,
1464    jobs: impl IntoIterator<Item = NewJob>,
1465) -> eyre::Result<()> {
1466    {
1467        let mut insert_stmt = tx.prepare_cached(
1468            r#"--sql
1469                INSERT INTO ora_job (
1470                    id,
1471                    schedule_id,
1472                    created_at_unix_ns,
1473                    job_type_id,
1474                    target_execution_time_unix_ns,
1475                    retry_policy,
1476                    timeout_policy,
1477                    input_payload_json,
1478                    metadata_json
1479                )
1480                VALUES (
1481                    :id,
1482                    :schedule_id,
1483                    :created_at_unix_ns,
1484                    :job_type_id,
1485                    :target_execution_time_unix_ns,
1486                    :retry_policy,
1487                    :timeout_policy,
1488                    :input_payload_json,
1489                    :metadata_json
1490                );
1491            "#,
1492        )?;
1493
1494        let mut insert_label_stmt = tx.prepare_cached(
1495            r#"--sql
1496                INSERT INTO ora_job_label (
1497                    job_id,
1498                    key,
1499                    value
1500                )
1501                VALUES (
1502                    :job_id,
1503                    :key,
1504                    :value
1505                );
1506            "#,
1507        )?;
1508
1509        for job in jobs {
1510            insert_stmt.execute(params![
1511                job.id,
1512                job.schedule_id,
1513                SqlSystemTime(job.created_at),
1514                job.job_type_id,
1515                SqlSystemTime(job.target_execution_time),
1516                JobRetryPolicy::from(job.retry_policy),
1517                JobTimeoutPolicy::from(job.timeout_policy),
1518                job.input_payload_json,
1519                job.metadata_json
1520            ])?;
1521
1522            for (key, value) in job.labels {
1523                insert_label_stmt.execute(params![job.id, key, value])?;
1524            }
1525        }
1526    }
1527    Ok(())
1528}
1529
1530fn add_schedules(
1531    tx: &rusqlite::Transaction,
1532    schedules: impl IntoIterator<Item = NewSchedule>,
1533) -> eyre::Result<()> {
1534    let mut insert_stmt = tx.prepare_cached(
1535        r#"--sql
1536                INSERT INTO ora_schedule (
1537                    id,
1538                    created_at_unix_ns,
1539                    job_type_id,
1540                    job_timing_policy,
1541                    job_creation_policy,
1542                    start_after_unix_ns,
1543                    end_before_unix_ns,
1544                    metadata_json
1545                )
1546                VALUES (
1547                    :id,
1548                    :created_at_unix_ns,
1549                    :job_type_id,
1550                    :job_timing_policy,
1551                    :job_creation_policy,
1552                    :start_after_unix_ns,
1553                    :end_before_unix_ns,
1554                    :metadata_json
1555                );
1556            "#,
1557    )?;
1558
1559    for schedule in schedules {
1560        let job_type_id = match &schedule.job_creation_policy {
1561            ora_storage::ScheduleJobCreationPolicy::JobDefinition(schedule_new_job_definition) => {
1562                schedule_new_job_definition.job_type_id.clone()
1563            }
1564        };
1565
1566        let start_after = schedule.time_range.as_ref().and_then(|r| r.start);
1567        let end_before = schedule.time_range.as_ref().and_then(|r| r.end);
1568
1569        insert_stmt.execute(params![
1570            schedule.id,
1571            SqlSystemTime(schedule.created_at),
1572            job_type_id,
1573            crate::models::ScheduleJobTimingPolicy::from(schedule.job_timing_policy),
1574            crate::models::ScheduleJobCreationPolicy::from(schedule.job_creation_policy),
1575            start_after.map(SqlSystemTime),
1576            end_before.map(SqlSystemTime),
1577            schedule.metadata_json
1578        ])?;
1579
1580        let mut insert_label_stmt = tx.prepare_cached(
1581            r#"--sql
1582                    INSERT INTO ora_schedule_label (
1583                        schedule_id,
1584                        key,
1585                        value
1586                    )
1587                    VALUES (
1588                        :schedule_id,
1589                        :key,
1590                        :value
1591                    );
1592                "#,
1593        )?;
1594
1595        for (key, value) in schedule.labels {
1596            insert_label_stmt.execute(params![schedule.id, key, value])?;
1597        }
1598    }
1599
1600    Ok(())
1601}