1use std::sync::Arc;
4
5use async_trait::async_trait;
6use eyre::Context;
7use migrations::run_migrations;
8use models::{JobRetryPolicy, JobTimeoutPolicy, SqlSystemTime};
9use ora_storage::{ScheduleTimeRange, Storage};
10use parking_lot::Mutex;
11use rusqlite::{named_params, params, params_from_iter, Connection};
12use uuid::Uuid;
13
14#[macro_use]
15mod util;
16
17mod migrations;
18mod models;
19
20mod job_query;
21mod schedule_query;
22
23mod sea_query_binder;
24
25const DEFAULT_TEMP_TABLE_THRESHOLD: usize = 1000;
26
27#[derive(Debug, Clone)]
29pub struct SqliteStorage {
30 db: Arc<Mutex<Connection>>,
31}
32
33impl SqliteStorage {
34 pub fn new(mut connection: Connection) -> eyre::Result<Self> {
36 run_migrations(&mut connection).wrap_err("failed to run migrations")?;
37 Ok(Self {
38 db: Arc::new(Mutex::new(connection)),
39 })
40 }
41
42 pub async fn optimize(&self) -> eyre::Result<()> {
46 self.with_db(|db| {
47 db.execute_batch(
48 r#"--sql
49 PRAGMA optimize;
50 "#,
51 )?;
52 Ok(())
53 })
54 .await
55 }
56
57 pub async fn vacuum(&self) -> eyre::Result<()> {
62 self.with_db(|db| {
63 db.execute_batch(
64 r#"--sql
65 VACUUM;
66 "#,
67 )?;
68 Ok(())
69 })
70 .await
71 }
72
73 async fn with_db<F, O>(&self, f: F) -> eyre::Result<O>
76 where
77 F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
78 O: Send + 'static,
79 {
80 let span = tracing::Span::current();
81
82 let db = self.db.clone();
83 tokio::task::spawn_blocking(move || {
84 let _guard = span.enter();
85 tracing::trace!("acquiring database lock");
86 let mut db = db.lock();
87 tracing::trace!("acquired database lock");
88 f(&mut db)
89 })
90 .await
91 .unwrap()
92 }
93}
94
95#[async_trait]
96impl Storage for SqliteStorage {
97 #[tracing::instrument(skip_all)]
98 async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
99 if job_types.is_empty() {
100 return Ok(());
101 }
102
103 self.with_db(|db| {
104 let tx = db.transaction()?;
105
106 {
107 let mut insert_stmt = tx.prepare(
108 r#"--sql
109 INSERT INTO ora_job_type (
110 id,
111 name,
112 description,
113 input_schema_json,
114 output_schema_json
115 )
116 VALUES (
117 :id,
118 :name,
119 :description,
120 :input_schema_json,
121 :output_schema_json
122 )
123 ON CONFLICT(id) DO UPDATE SET
124 name = excluded.name,
125 description = excluded.description,
126 input_schema_json = excluded.input_schema_json,
127 output_schema_json = excluded.output_schema_json;
128 "#,
129 )?;
130
131 for job_type in job_types {
132 insert_stmt.execute(params![
133 job_type.id,
134 job_type.name,
135 job_type.description,
136 job_type.input_schema_json,
137 job_type.output_schema_json
138 ])?;
139 }
140 }
141
142 tx.commit()?;
143
144 Ok(())
145 })
146 .await
147 }
148
149 #[tracing::instrument(skip_all)]
150 async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
151 if jobs.is_empty() {
152 return Ok(());
153 }
154
155 self.with_db(|db| {
156 let tx = db.transaction()?;
157
158 {
159 let mut insert_stmt = tx.prepare_cached(
160 r#"--sql
161 INSERT INTO ora_job (
162 id,
163 schedule_id,
164 created_at_unix_ns,
165 job_type_id,
166 target_execution_time_unix_ns,
167 retry_policy,
168 timeout_policy,
169 input_payload_json,
170 metadata_json
171 )
172 VALUES (
173 :id,
174 :schedule_id,
175 :created_at_unix_ns,
176 :job_type_id,
177 :target_execution_time_unix_ns,
178 :retry_policy,
179 :timeout_policy,
180 :input_payload_json,
181 :metadata_json
182 );
183 "#,
184 )?;
185
186 let mut insert_label_stmt = tx.prepare_cached(
187 r#"--sql
188 INSERT INTO ora_job_label (
189 job_id,
190 key,
191 value
192 )
193 VALUES (
194 :job_id,
195 :key,
196 :value
197 );
198 "#,
199 )?;
200
201 for job in jobs {
202 insert_stmt.execute(params![
203 job.id,
204 job.schedule_id,
205 SqlSystemTime(job.created_at),
206 job.job_type_id,
207 SqlSystemTime(job.target_execution_time),
208 JobRetryPolicy::from(job.retry_policy),
209 JobTimeoutPolicy::from(job.timeout_policy),
210 job.input_payload_json,
211 job.metadata_json
212 ])?;
213
214 for (key, value) in job.labels {
215 insert_label_stmt.execute(params![job.id, key, value])?;
216 }
217 }
218 }
219
220 tx.commit()?;
221
222 Ok(())
223 })
224 .await
225 }
226
227 #[tracing::instrument(skip_all)]
228 async fn jobs_cancelled(
229 &self,
230 job_ids: &[Uuid],
231 timestamp: std::time::SystemTime,
232 ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
233 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
234
235 if job_ids.is_empty() {
236 return Ok(vec![]);
237 }
238
239 let job_ids: Vec<_> = job_ids.into();
240
241 self.with_db(move |db| {
242 let tx = db.transaction()?;
243
244 let cancelled_jobs = if job_ids.len() < TEMP_TABLE_THRESHOLD {
245 let mut update_stmt = tx.prepare_cached(
246 r#"--sql
247 UPDATE ora_job
248 SET
249 cancelled_at_unix_ns = :cancelled_at_unix_ns,
250 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
251 WHERE
252 id = :id
253 AND cancelled_at_unix_ns IS NULL
254 AND marked_unschedulable_at_unix_ns IS NULL
255 RETURNING id, (
256 SELECT id
257 FROM ora_execution
258 WHERE job_id = ora_job.id
259 AND active
260 LIMIT 1
261 ) AS execution_id;
262 "#,
263 )?;
264
265 let mut cancelled_jobs = Vec::new();
266
267 for id in &job_ids {
268 let mut rows = update_stmt.query_map(
269 named_params![
270 ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
271 ":id": id
272 ],
273 |row| {
274 Ok(ora_storage::CancelledJob {
275 id: row.get(0)?,
276 active_execution: row.get(1)?,
277 })
278 },
279 )?;
280
281 if let Some(cancelled_job) = rows.next() {
282 let cancelled_job = cancelled_job?;
283 cancelled_jobs.push(cancelled_job);
284 }
285 }
286
287 cancelled_jobs
288 } else {
289 {
290 tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
291
292 let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
293
294 for id in &job_ids {
295 insert_stmt.execute(params![id])?;
296 }
297 }
298
299 let mut update_stmt = tx.prepare_cached(
300 r#"--sql
301 UPDATE ora_job
302 SET
303 cancelled_at_unix_ns = :cancelled_at_unix_ns,
304 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
305 WHERE
306 id IN (SELECT id FROM temp.job_ids)
307 AND cancelled_at_unix_ns IS NULL
308 AND marked_unschedulable_at_unix_ns IS NULL
309 RETURNING id, (
310 SELECT id
311 FROM ora_execution
312 WHERE job_id = ora_job.id
313 AND active
314 LIMIT 1
315 ) AS execution_id;
316 "#,
317 )?;
318
319 let cancelled_jobs = update_stmt
320 .query_map(
321 named_params![
322 ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
323 ],
324 |row| {
325 Ok(ora_storage::CancelledJob {
326 id: row.get(0)?,
327 active_execution: row.get(1)?,
328 })
329 },
330 )?
331 .collect::<Result<Vec<ora_storage::CancelledJob>, _>>()?;
332
333 tx.execute("DROP TABLE temp.job_ids", [])?;
334
335 cancelled_jobs
336 };
337
338 tx.commit()?;
339
340 Ok(cancelled_jobs)
341 })
342 .await
343 }
344
345 #[tracing::instrument(skip_all)]
346 async fn executions_added(
347 &self,
348 executions: Vec<ora_storage::NewExecution>,
349 timestamp: std::time::SystemTime,
350 ) -> eyre::Result<()> {
351 if executions.is_empty() {
352 return Ok(());
353 }
354
355 self.with_db(move |db| {
356 let tx = db.transaction()?;
357
358 {
359 let mut insert_stmt = tx.prepare_cached(
360 r#"--sql
361 INSERT INTO ora_execution (
362 id,
363 job_id,
364 created_at_unix_ns
365 )
366 VALUES (
367 :id,
368 :job_id,
369 :created_at_unix_ns
370 );
371 "#,
372 )?;
373
374 for execution in executions {
375 insert_stmt.execute(params![
376 execution.id,
377 execution.job_id,
378 SqlSystemTime(timestamp)
379 ])?;
380 }
381 }
382
383 tx.commit()?;
384
385 Ok(())
386 })
387 .await
388 }
389
390 #[tracing::instrument(skip_all)]
391 async fn executions_ready(
392 &self,
393 execution_ids: &[Uuid],
394 timestamp: std::time::SystemTime,
395 ) -> eyre::Result<()> {
396 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
397
398 if execution_ids.is_empty() {
399 return Ok(());
400 }
401
402 let execution_ids: Vec<_> = execution_ids.into();
403
404 self.with_db(move |db| {
405 let tx = db.transaction()?;
406
407 if execution_ids.len() < TEMP_TABLE_THRESHOLD {
408 let mut update_stmt = tx.prepare_cached(
409 r#"--sql
410 UPDATE ora_execution
411 SET ready_at_unix_ns = :ready_at_unix_ns
412 WHERE id = :id;
413 "#,
414 )?;
415
416 for id in &execution_ids {
417 update_stmt.execute(params![SqlSystemTime(timestamp), id])?;
418 }
419 } else {
420 {
421 tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
422
423 let mut insert_stmt =
424 tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
425
426 for id in &execution_ids {
427 insert_stmt.execute(params![id])?;
428 }
429 }
430
431 let mut update_stmt = tx.prepare_cached(
432 r#"--sql
433 UPDATE ora_execution
434 SET ready_at_unix_ns = :ready_at_unix_ns
435 WHERE
436 id IN (SELECT id FROM temp.execution_ids)
437 AND ready_at_unix_ns IS NULL;
438 "#,
439 )?;
440
441 update_stmt.execute(params![SqlSystemTime(timestamp)])?;
442
443 tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
444 }
445
446 tx.commit()?;
447
448 Ok(())
449 })
450 .await
451 }
452
453 #[tracing::instrument(skip_all)]
454 async fn execution_assigned(
455 &self,
456 execution_id: Uuid,
457 executor_id: Uuid,
458 timestamp: std::time::SystemTime,
459 ) -> eyre::Result<()> {
460 self.with_db(move |db| {
461 let tx = db.transaction()?;
462
463 {
464 let mut update_stmt = tx.prepare_cached(
465 r#"--sql
466 UPDATE ora_execution
467 SET executor_id = :executor_id,
468 assigned_at_unix_ns = :assigned_at_unix_ns
469 WHERE id = :id;
470 "#,
471 )?;
472
473 update_stmt.execute(named_params![
474 ":executor_id": executor_id,
475 ":assigned_at_unix_ns": SqlSystemTime(timestamp),
476 ":id": execution_id
477 ])?;
478 }
479
480 tx.commit()?;
481
482 Ok(())
483 })
484 .await
485 }
486
487 #[tracing::instrument(skip_all)]
488 async fn execution_started(
489 &self,
490 execution_id: Uuid,
491 timestamp: std::time::SystemTime,
492 ) -> eyre::Result<()> {
493 self.with_db(move |db| {
494 let tx = db.transaction()?;
495
496 {
497 let mut update_stmt = tx.prepare_cached(
498 r#"--sql
499 UPDATE ora_execution
500 SET started_at_unix_ns = :started_at_unix_ns
501 WHERE id = :id;
502 "#,
503 )?;
504
505 update_stmt.execute(named_params! {
506 ":started_at_unix_ns": SqlSystemTime(timestamp),
507 ":id": execution_id
508 })?;
509 }
510
511 tx.commit()?;
512
513 Ok(())
514 })
515 .await
516 }
517
518 #[tracing::instrument(skip_all)]
519 async fn execution_succeeded(
520 &self,
521 execution_id: Uuid,
522 timestamp: std::time::SystemTime,
523 output_payload_json: String,
524 ) -> eyre::Result<()> {
525 self.with_db(move |db| {
526 let tx = db.transaction()?;
527
528 {
529 let mut update_stmt = tx.prepare_cached(
530 r#"--sql
531 UPDATE ora_execution
532 SET succeeded_at_unix_ns = :succeeded_at_unix_ns,
533 output_payload_json = :output_payload_json
534 WHERE id = :id;
535 "#,
536 )?;
537
538 update_stmt.execute(named_params![
539 ":succeeded_at_unix_ns": SqlSystemTime(timestamp),
540 ":output_payload_json": output_payload_json,
541 ":id": execution_id
542 ])?;
543 }
544
545 {
546 let mut update_stmt = tx.prepare_cached(
547 r#"--sql
548 UPDATE ora_job
549 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
550 WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
551 "#,
552 )?;
553
554 update_stmt.execute(named_params! {
555 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
556 ":id": execution_id
557 })?;
558 }
559 tx.commit()?;
560
561 Ok(())
562 })
563 .await
564 }
565
566 #[tracing::instrument(skip_all)]
567 async fn executions_failed(
568 &self,
569 execution_ids: &[Uuid],
570 timestamp: std::time::SystemTime,
571 reason: String,
572 mark_job_unschedulable: bool,
573 ) -> eyre::Result<()> {
574 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
575
576 if execution_ids.is_empty() {
577 return Ok(());
578 }
579
580 let execution_ids: Vec<_> = execution_ids.into();
581
582 self.with_db(move |db| {
583 let tx = db.transaction()?;
584
585 if execution_ids.len() < TEMP_TABLE_THRESHOLD {
586 let mut update_stmt = tx.prepare_cached(
587 r#"--sql
588 UPDATE ora_execution
589 SET failed_at_unix_ns = :failed_at_unix_ns,
590 failure_reason = :failure_reason
591 WHERE id = :id;
592 "#,
593 )?;
594
595 for id in &execution_ids {
596 update_stmt.execute(named_params![
597 ":failed_at_unix_ns": SqlSystemTime(timestamp),
598 ":failure_reason": reason,
599 ":id": id
600 ])?;
601 }
602
603 if mark_job_unschedulable {
604 let mut update_stmt = tx.prepare_cached(
605 r#"--sql
606 UPDATE ora_job
607 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
608 WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
609 "#,
610 )?;
611
612 for id in &execution_ids {
613 update_stmt.execute(named_params! {
614 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
615 ":id": id
616 })?;
617 }
618 }
619 } else {
620 {
621 tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
622
623 let mut insert_stmt =
624 tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
625
626 for id in &execution_ids {
627 insert_stmt.execute(params![id])?;
628 }
629 }
630
631 let mut update_stmt = tx.prepare_cached(
632 r#"--sql
633 UPDATE ora_execution
634 SET failed_at_unix_ns = :failed_at_unix_ns,
635 failure_reason = :failure_reason
636 WHERE
637 id IN (SELECT id FROM temp.execution_ids)
638 AND failed_at_unix_ns IS NULL;
639 "#,
640 )?;
641
642 update_stmt.execute(named_params![
643 ":failed_at_unix_ns": SqlSystemTime(timestamp),
644 ":failure_reason": reason
645 ])?;
646
647 if mark_job_unschedulable {
648 let mut update_stmt = tx.prepare_cached(
649 r#"--sql
650 UPDATE ora_job
651 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
652 WHERE id IN (SELECT job_id FROM ora_execution WHERE id IN (SELECT id FROM temp.execution_ids));
653 "#,
654 )?;
655
656 update_stmt.execute(named_params! {
657 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
658 })?;
659 }
660 }
661
662
663
664 tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
665
666 tx.commit()?;
667
668 Ok(())
669 })
670 .await
671 }
672
673 #[tracing::instrument(skip_all)]
674 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
675 let executor_ids: Vec<_> = executor_ids.into();
676
677 self.with_db(move |db| {
678 if executor_ids.is_empty() {
679 let mut stmt = db.prepare(
680 r#"--sql
681 SELECT id
682 FROM ora_execution
683 WHERE
684 executor_id IS NOT NULL
685 AND (
686 succeeded_at_unix_ns IS NULL
687 AND failed_at_unix_ns IS NULL
688 );
689 "#,
690 )?;
691
692 let ids = stmt
693 .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
694 .collect::<Result<Vec<Uuid>, _>>()?;
695
696 Ok(ids)
697 } else {
698 let id_params_sql = "?,".repeat(executor_ids.len());
699 let id_params_sql = &id_params_sql[..id_params_sql.len() - 1];
700
701 let mut stmt = db.prepare(&format!(
702 r#"--sql
703 SELECT id
704 FROM ora_execution
705 WHERE
706 executor_id NOT IN ({id_params_sql})
707 AND (
708 succeeded_at_unix_ns IS NULL
709 AND failed_at_unix_ns IS NULL
710 );
711 "#
712 ))?;
713
714 let ids = stmt
715 .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
716 .collect::<Result<Vec<Uuid>, _>>()?;
717
718 Ok(ids)
719 }
720 })
721 .await
722 }
723
724 #[tracing::instrument(skip_all)]
725 async fn jobs_unschedulable(
726 &self,
727 job_ids: &[Uuid],
728 timestamp: std::time::SystemTime,
729 ) -> eyre::Result<()> {
730 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
731
732 if job_ids.is_empty() {
733 return Ok(());
734 }
735
736 let job_ids: Vec<_> = job_ids.into();
737
738 self.with_db(move |db| {
739 let tx = db.transaction()?;
740
741 if job_ids.len() < TEMP_TABLE_THRESHOLD {
742 let mut update_stmt = tx.prepare_cached(
743 r#"--sql
744 UPDATE ora_job
745 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
746 WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
747 "#,
748 )?;
749
750 for id in &job_ids {
751 update_stmt.execute(named_params![
752 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
753 ":id": id
754 ])?;
755 }
756 } else {
757 {
758 tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
759
760 let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
761
762 for id in &job_ids {
763 insert_stmt.execute(params![id])?;
764 }
765 }
766
767 let mut update_stmt = tx.prepare_cached(
768 r#"--sql
769 UPDATE ora_job
770 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
771 WHERE
772 id IN (SELECT id FROM temp.job_ids)
773 AND marked_unschedulable_at_unix_ns IS NULL;
774 "#,
775 )?;
776
777 update_stmt.execute(named_params![
778 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
779 ])?;
780
781 tx.execute("DROP TABLE temp.job_ids", [])?;
782 }
783
784 tx.commit()?;
785
786 Ok(())
787 })
788 .await
789 }
790
791 #[tracing::instrument(skip_all)]
792 async fn pending_executions(
793 &self,
794 after: Option<Uuid>,
795 ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
796 self.with_db(move |db| {
797 let mut stmt = db.prepare_cached(
798 r#"--sql
799 SELECT
800 e.id,
801 j.target_execution_time_unix_ns
802 FROM ora_execution e
803 JOIN
804 ora_job j
805 ON
806 e.job_id = j.id
807 WHERE
808 e.ready_at_unix_ns IS NULL
809 AND (? IS NULL OR e.id > ?)
810 LIMIT 10000;
811 "#,
812 )?;
813
814 let pending_executions = stmt
815 .query_map(params![after, after], |row| {
816 Ok(ora_storage::PendingExecution {
817 id: row.get(0)?,
818 target_execution_time: row.get::<_, SqlSystemTime>(1)?.into(),
819 })
820 })?
821 .collect::<Result<Vec<_>, _>>()?;
822
823 Ok(pending_executions)
824 })
825 .await
826 }
827
828 #[tracing::instrument(skip_all)]
829 async fn ready_executions(
830 &self,
831 after: Option<Uuid>,
832 ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
833 self.with_db(move |db| {
834 let mut stmt = db.prepare_cached(
835 r#"--sql
836 SELECT
837 e.id,
838 e.job_id,
839 j.input_payload_json,
840 (SELECT COUNT(*) FROM ora_execution WHERE job_id = e.job_id) AS attempt_number,
841 j.job_type_id,
842 j.target_execution_time_unix_ns,
843 j.timeout_policy
844 FROM ora_execution e
845 JOIN
846 ora_job j
847 ON
848 j.id = e.job_id
849 WHERE
850 ready_at_unix_ns IS NOT NULL
851 AND assigned_at_unix_ns IS NULL
852 AND started_at_unix_ns IS NULL
853 AND failed_at_unix_ns IS NULL
854 AND succeeded_at_unix_ns IS NULL
855 AND (? IS NULL OR e.id > ?)
856 ORDER BY e.id ASC
857 LIMIT 10000;
858 "#,
859 )?;
860
861 let mut rows = stmt.query([
862 after,
863 after,
864 ])?;
865
866 let mut ready_executions = Vec::new();
867
868 while let Some(row) = rows.next()? {
869 ready_executions.push(ora_storage::ReadyExecution {
870 id: row.get(0)?,
871 job_id: row.get(1)?,
872 input_payload_json: row.get(2)?,
873 attempt_number: row.get(3)?,
874 job_type_id: row.get(4)?,
875 target_execution_time: row.get::<_, SqlSystemTime>(5)?.into(),
876 timeout_policy: row.get::<_, JobTimeoutPolicy>(6)?.into(),
877 });
878 }
879
880 Ok(ready_executions)
881
882 })
883 .await
884 }
885
886 #[tracing::instrument(skip_all)]
887 async fn pending_jobs(
888 &self,
889 after: Option<Uuid>,
890 ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
891 self.with_db(move |db| {
892 let mut stmt = db.prepare_cached(
893 r#"--sql
894 SELECT
895 id,
896 target_execution_time_unix_ns,
897 (
898 SELECT COUNT(*)
899 FROM ora_execution
900 WHERE job_id = ora_job.id
901 ) AS execution_count,
902 retry_policy,
903 timeout_policy
904 FROM ora_job
905 WHERE
906 marked_unschedulable_at_unix_ns IS NULL
907 AND NOT EXISTS (
908 SELECT 1
909 FROM ora_execution
910 WHERE job_id = ora_job.id
911 AND succeeded_at_unix_ns IS NULL
912 AND failed_at_unix_ns IS NULL
913 )
914 AND (? IS NULL OR id > ?)
915 ORDER BY id ASC
916 LIMIT 10000;
917 "#,
918 )?;
919
920 let mut rows = stmt.query([after, after])?;
921
922 let mut pending_jobs = Vec::new();
923
924 while let Some(row) = rows.next()? {
925 pending_jobs.push(ora_storage::PendingJob {
926 id: row.get(0)?,
927 target_execution_time: row.get::<_, SqlSystemTime>(1)?.0,
928 execution_count: row.get(2)?,
929 retry_policy: row.get::<_, JobRetryPolicy>(3)?.into(),
930 timeout_policy: row.get::<_, JobTimeoutPolicy>(4)?.into(),
931 });
932 }
933
934 Ok(pending_jobs)
935 })
936 .await
937 }
938
939 #[tracing::instrument(skip_all)]
940 async fn query_jobs(
941 &self,
942 cursor: Option<String>,
943 limit: usize,
944 order: ora_storage::JobQueryOrder,
945 filters: ora_storage::JobQueryFilters,
946 ) -> eyre::Result<ora_storage::JobQueryResult> {
947 let cursor: Option<job_query::Cursor> = match cursor {
948 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
949 None => None,
950 };
951
952 self.with_db(move |db| {
953 let mut tx = db.transaction()?;
954 let res = job_query::query_job_details(&mut tx, cursor, limit, order, filters);
955 tx.commit()?;
956 res
957 })
958 .await
959 }
960
961 #[tracing::instrument(skip_all)]
962 async fn query_job_ids(
963 &self,
964 filters: ora_storage::JobQueryFilters,
965 ) -> eyre::Result<Vec<Uuid>> {
966 self.with_db(|db| {
967 let mut tx = db.transaction()?;
968 let res = job_query::job_ids(&mut tx, filters);
969 tx.commit()?;
970 res
971 })
972 .await
973 }
974
975 #[tracing::instrument(skip_all)]
976 async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
977 self.with_db(|db| {
978 let mut tx = db.transaction()?;
979 let res = job_query::count_jobs(&mut tx, filters);
980 tx.commit()?;
981 res
982 })
983 .await
984 }
985
986 #[tracing::instrument(skip_all)]
987 async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
988 self.with_db(|db| {
989 let mut stmt = db.prepare_cached(
990 r#"--sql
991 SELECT
992 id,
993 name,
994 description,
995 input_schema_json,
996 output_schema_json
997 FROM ora_job_type;
998 "#,
999 )?;
1000
1001 let job_types = stmt
1002 .query_map([], |row| {
1003 Ok(ora_storage::JobType {
1004 id: row.get(0)?,
1005 name: row.get(1)?,
1006 description: row.get(2)?,
1007 input_schema_json: row.get(3)?,
1008 output_schema_json: row.get(4)?,
1009 })
1010 })?
1011 .collect::<Result<Vec<_>, _>>()?;
1012
1013 Ok(job_types)
1014 })
1015 .await
1016 }
1017
1018 #[tracing::instrument(skip_all)]
1019 async fn delete_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
1020 self.with_db(|db| {
1021 let mut tx = db.transaction()?;
1022 let res = job_query::delete_jobs(&mut tx, filters);
1023 tx.commit()?;
1024 res
1025 })
1026 .await
1027 }
1028
1029 #[tracing::instrument(skip_all)]
1030 async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
1031 if schedules.is_empty() {
1032 return Ok(());
1033 }
1034
1035 self.with_db(|db| {
1036 let tx = db.transaction()?;
1037
1038 {
1039 let mut insert_stmt = tx.prepare_cached(
1040 r#"--sql
1041 INSERT INTO ora_schedule (
1042 id,
1043 created_at_unix_ns,
1044 job_type_id,
1045 job_timing_policy,
1046 job_creation_policy,
1047 start_after_unix_ns,
1048 end_before_unix_ns,
1049 metadata_json
1050 )
1051 VALUES (
1052 :id,
1053 :created_at_unix_ns,
1054 :job_type_id,
1055 :job_timing_policy,
1056 :job_creation_policy,
1057 :start_after_unix_ns,
1058 :end_before_unix_ns,
1059 :metadata_json
1060 );
1061 "#,
1062 )?;
1063
1064 for schedule in schedules {
1065 let job_type_id = match &schedule.job_creation_policy {
1066 ora_storage::ScheduleJobCreationPolicy::JobDefinition(
1067 schedule_new_job_definition,
1068 ) => schedule_new_job_definition.job_type_id.clone(),
1069 };
1070
1071 let start_after = schedule.time_range.as_ref().and_then(|r| r.start);
1072 let end_before = schedule.time_range.as_ref().and_then(|r| r.end);
1073
1074 insert_stmt.execute(params![
1075 schedule.id,
1076 SqlSystemTime(schedule.created_at),
1077 job_type_id,
1078 crate::models::ScheduleJobTimingPolicy::from(schedule.job_timing_policy),
1079 crate::models::ScheduleJobCreationPolicy::from(
1080 schedule.job_creation_policy
1081 ),
1082 start_after.map(SqlSystemTime),
1083 end_before.map(SqlSystemTime),
1084 schedule.metadata_json
1085 ])?;
1086
1087 let mut insert_label_stmt = tx.prepare_cached(
1088 r#"--sql
1089 INSERT INTO ora_schedule_label (
1090 schedule_id,
1091 key,
1092 value
1093 )
1094 VALUES (
1095 :schedule_id,
1096 :key,
1097 :value
1098 );
1099 "#,
1100 )?;
1101
1102 for (key, value) in schedule.labels {
1103 insert_label_stmt.execute(params![schedule.id, key, value])?;
1104 }
1105 }
1106 }
1107
1108 tx.commit()?;
1109
1110 Ok(())
1111 })
1112 .await
1113 }
1114
1115 #[tracing::instrument(skip_all)]
1116 async fn schedules_cancelled(
1117 &self,
1118 schedule_ids: &[Uuid],
1119 timestamp: std::time::SystemTime,
1120 ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
1121 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1122
1123 if schedule_ids.is_empty() {
1124 return Ok(vec![]);
1125 }
1126
1127 let schedule_ids: Vec<_> = schedule_ids.into();
1128
1129 self.with_db(move |db| {
1130 let tx = db.transaction()?;
1131
1132 let cancelled_schedules = if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1133 let mut update_stmt = tx.prepare_cached(
1134 r#"--sql
1135 UPDATE ora_schedule
1136 SET
1137 cancelled_at_unix_ns = :cancelled_at_unix_ns,
1138 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1139 WHERE
1140 id = :id
1141 AND cancelled_at_unix_ns IS NULL
1142 AND marked_unschedulable_at_unix_ns IS NULL
1143 RETURNING id;
1144 "#,
1145 )?;
1146
1147 let mut cancelled_schedules = Vec::new();
1148
1149 for id in &schedule_ids {
1150 let mut rows = update_stmt.query_map(
1151 named_params![
1152 ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
1153 ":id": id
1154 ],
1155 |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1156 )?;
1157
1158 if let Some(cancelled_schedule) = rows.next() {
1159 let cancelled_schedule = cancelled_schedule?;
1160 cancelled_schedules.push(cancelled_schedule);
1161 }
1162 }
1163
1164 cancelled_schedules
1165 } else {
1166 {
1167 tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1168
1169 let mut insert_stmt =
1170 tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1171
1172 for id in &schedule_ids {
1173 insert_stmt.execute(params![id])?;
1174 }
1175 }
1176
1177 let mut update_stmt = tx.prepare_cached(
1178 r#"--sql
1179 UPDATE ora_schedule
1180 SET
1181 cancelled_at_unix_ns = :cancelled_at_unix_ns,
1182 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1183 WHERE
1184 id IN (SELECT id FROM temp.schedule_ids)
1185 AND cancelled_at_unix_ns IS NULL
1186 AND marked_unschedulable_at_unix_ns IS NULL
1187 RETURNING id;
1188 "#,
1189 )?;
1190
1191 let cancelled_schedules = update_stmt
1192 .query_map(
1193 named_params![
1194 ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
1195 ],
1196 |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1197 )?
1198 .collect::<Result<Vec<_>, _>>()?;
1199
1200 tx.execute("DROP TABLE temp.schedule_ids", [])?;
1201
1202 cancelled_schedules
1203 };
1204
1205 tx.commit()?;
1206
1207 Ok(cancelled_schedules)
1208 })
1209 .await
1210 }
1211
1212 #[tracing::instrument(skip_all)]
1213 async fn schedules_unschedulable(
1214 &self,
1215 schedule_ids: &[Uuid],
1216 timestamp: std::time::SystemTime,
1217 ) -> eyre::Result<()> {
1218 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1219
1220 if schedule_ids.is_empty() {
1221 return Ok(());
1222 }
1223
1224 let schedule_ids: Vec<_> = schedule_ids.into();
1225
1226 self.with_db(move |db| {
1227 let tx = db.transaction()?;
1228
1229 if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1230 let mut update_stmt = tx.prepare_cached(
1231 r#"--sql
1232 UPDATE ora_schedule
1233 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1234 WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
1235 "#,
1236 )?;
1237
1238 for id in &schedule_ids {
1239 update_stmt.execute(named_params![
1240 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
1241 ":id": id
1242 ])?;
1243 }
1244 } else {
1245 {
1246 tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1247
1248 let mut insert_stmt =
1249 tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1250
1251 for id in &schedule_ids {
1252 insert_stmt.execute(params![id])?;
1253 }
1254 }
1255
1256 let mut update_stmt = tx.prepare_cached(
1257 r#"--sql
1258 UPDATE ora_schedule
1259 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1260 WHERE
1261 id IN (SELECT id FROM temp.schedule_ids)
1262 AND marked_unschedulable_at_unix_ns IS NULL;
1263 "#,
1264 )?;
1265
1266 update_stmt.execute(named_params![
1267 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
1268 ])?;
1269
1270 tx.execute("DROP TABLE temp.schedule_ids", [])?;
1271 }
1272
1273 tx.commit()?;
1274
1275 Ok(())
1276 })
1277 .await
1278 }
1279
1280 #[tracing::instrument(skip_all)]
1281 async fn pending_schedules(
1282 &self,
1283 after: Option<Uuid>,
1284 ) -> eyre::Result<Vec<ora_storage::PendingSchedule>> {
1285 self.with_db(move |db| {
1286 let mut stmt = db.prepare_cached(
1287 r#"--sql
1288 SELECT
1289 id,
1290 job_timing_policy,
1291 job_creation_policy,
1292 (
1293 SELECT MAX(target_execution_time_unix_ns)
1294 FROM ora_job
1295 WHERE schedule_id = ora_schedule.id
1296 ) AS last_target_execution_time_ns,
1297 start_after_unix_ns,
1298 end_before_unix_ns
1299 FROM ora_schedule
1300 WHERE
1301 marked_unschedulable_at_unix_ns IS NULL
1302 AND NOT EXISTS (
1303 SELECT 1
1304 FROM ora_job
1305 WHERE schedule_id = ora_schedule.id
1306 AND marked_unschedulable_at_unix_ns IS NULL
1307 )
1308 AND (? IS NULL OR id > ?)
1309 ORDER BY id ASC
1310 LIMIT 10000;
1311 "#,
1312 )?;
1313
1314 let mut rows = stmt.query([after, after])?;
1315
1316 let mut pending_schedules = Vec::new();
1317
1318 while let Some(row) = rows.next()? {
1319 pending_schedules.push(ora_storage::PendingSchedule {
1320 id: row.get(0)?,
1321 job_timing_policy: row
1322 .get::<_, crate::models::ScheduleJobTimingPolicy>(1)?
1323 .into(),
1324 job_creation_policy: row
1325 .get::<_, crate::models::ScheduleJobCreationPolicy>(2)?
1326 .into(),
1327 last_target_execution_time: row
1328 .get::<_, Option<SqlSystemTime>>(3)?
1329 .map(Into::into),
1330 time_range: Some(ScheduleTimeRange {
1331 start: row.get::<_, Option<SqlSystemTime>>(4)?.map(Into::into),
1332 end: row.get::<_, Option<SqlSystemTime>>(5)?.map(Into::into),
1333 }),
1334 });
1335 }
1336
1337 Ok(pending_schedules)
1338 })
1339 .await
1340 }
1341
1342 #[tracing::instrument(skip_all)]
1343 async fn query_schedules(
1344 &self,
1345 cursor: Option<String>,
1346 limit: usize,
1347 filters: ora_storage::ScheduleQueryFilters,
1348 order: ora_storage::ScheduleQueryOrder,
1349 ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
1350 let cursor: Option<schedule_query::Cursor> = match cursor {
1351 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1352 None => None,
1353 };
1354
1355 self.with_db(move |db| {
1356 let mut tx = db.transaction()?;
1357 let res =
1358 schedule_query::query_schedule_details(&mut tx, cursor, limit, order, filters);
1359 tx.commit()?;
1360 res
1361 })
1362 .await
1363 }
1364
1365 #[tracing::instrument(skip_all)]
1366 async fn query_schedule_ids(
1367 &self,
1368 filters: ora_storage::ScheduleQueryFilters,
1369 ) -> eyre::Result<Vec<Uuid>> {
1370 self.with_db(|db| {
1371 let mut tx = db.transaction()?;
1372 let res = schedule_query::schedule_ids(&mut tx, filters);
1373 tx.commit()?;
1374 res
1375 })
1376 .await
1377 }
1378
1379 #[tracing::instrument(skip_all)]
1380 async fn count_schedules(
1381 &self,
1382 filters: ora_storage::ScheduleQueryFilters,
1383 ) -> eyre::Result<u64> {
1384 self.with_db(|db| {
1385 let mut tx = db.transaction()?;
1386 let res = schedule_query::count_schedules(&mut tx, filters);
1387 tx.commit()?;
1388 res
1389 })
1390 .await
1391 }
1392
1393 #[tracing::instrument(skip_all)]
1394 async fn delete_schedules(
1395 &self,
1396 filters: ora_storage::ScheduleQueryFilters,
1397 ) -> eyre::Result<Vec<Uuid>> {
1398 self.with_db(|db| {
1399 let mut tx = db.transaction()?;
1400 let res = schedule_query::delete_schedules(&mut tx, filters);
1401 tx.commit()?;
1402 res
1403 })
1404 .await
1405 }
1406}