1use std::{iter, path::PathBuf, sync::Arc};
4
5use async_trait::async_trait;
6use eyre::Context;
7use migrations::run_migrations;
8use models::{JobRetryPolicy, JobTimeoutPolicy, SqlSystemTime};
9use ora_storage::{NewJob, NewSchedule, ScheduleTimeRange, Storage};
10use parking_lot::Mutex;
11use rusqlite::{named_params, params, params_from_iter, Connection, OpenFlags};
12use uuid::Uuid;
13
14#[macro_use]
15mod util;
16
17mod migrations;
18mod models;
19
20mod job_query;
21mod schedule_query;
22
23mod sea_query_binder;
24
25const DEFAULT_TEMP_TABLE_THRESHOLD: usize = 1000;
26
27#[derive(Debug, Clone)]
29pub struct SqliteStorage {
30 pool: deadpool::unmanaged::Pool<Connection>,
32 scheduler_mutex: Arc<Mutex<()>>,
35}
36
37#[must_use]
39pub struct SqliteStorageConfig {
40 path: Option<PathBuf>,
44 flags: OpenFlags,
46 #[allow(clippy::type_complexity)]
49 conn_init: Option<Box<dyn Fn(&mut Connection) -> eyre::Result<()>>>,
50 #[allow(clippy::type_complexity)]
52 init: Option<Box<dyn Fn(&mut Connection) -> eyre::Result<()>>>,
53 size: usize,
57}
58
59impl SqliteStorageConfig {
60 pub fn new(path: impl Into<PathBuf>) -> Self {
62 Self {
63 path: Some(path.into()),
64 conn_init: None,
65 init: None,
66 flags: OpenFlags::default(),
67 size: 1,
68 }
69 }
70
71 pub fn new_in_memory() -> Self {
73 Self {
74 path: None,
75 conn_init: None,
76 init: None,
77 flags: OpenFlags::default(),
78 size: 1,
79 }
80 }
81
82 pub fn with_init<F>(mut self, f: F) -> Self
90 where
91 F: Fn(&mut Connection) -> eyre::Result<()> + 'static,
92 {
93 self.init = Some(Box::new(f));
94 self
95 }
96
97 pub fn with_connection_init<F>(mut self, f: F) -> Self
102 where
103 F: Fn(&mut Connection) -> eyre::Result<()> + 'static,
104 {
105 self.conn_init = Some(Box::new(f));
106 self
107 }
108
109 pub fn with_flags(mut self, flags: OpenFlags) -> Self {
111 self.flags = flags;
112 self
113 }
114
115 pub fn with_connection_count(mut self, size: usize) -> Self {
123 assert!(size > 0, "pool size must be greater than 0");
124 self.size = size;
125 self
126 }
127
128 fn new_connection(&self) -> rusqlite::Result<Connection> {
130 tracing::debug!("creating new sqlite connection");
131 if let Some(path) = &self.path {
132 Connection::open_with_flags(path, self.flags)
133 } else {
134 Connection::open_in_memory_with_flags(self.flags)
135 }
136 }
137}
138
139impl std::fmt::Debug for SqliteStorageConfig {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("SqliteStorageConfig")
142 .field("path", &self.path)
143 .finish_non_exhaustive()
144 }
145}
146
147impl SqliteStorage {
148 pub fn new(mut config: SqliteStorageConfig) -> eyre::Result<Self> {
150 if config.path.is_none() && config.size > 1 {
151 tracing::warn!("sqlite in-memory storage with multiple connections is not supported");
152 config.size = 1;
153 }
154
155 let pool = deadpool::unmanaged::Pool::new(config.size);
156
157 for i in 0..config.size {
158 let mut conn = config.new_connection()?;
159
160 if i == 0 {
161 if let Some(init) = &config.init {
162 init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
163 }
164
165 if let Some(conn_init) = &config.conn_init {
166 conn_init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
167 }
168
169 run_migrations(&mut conn).wrap_err("failed to run migrations")?;
170 } else if let Some(conn_init) = &config.conn_init {
171 conn_init(&mut conn).wrap_err("failed to initialize sqlite connection")?;
172 }
173
174 pool.try_add(conn).map_err(|(_, e)| e)?;
175 }
176
177 Ok(Self {
178 pool,
179 scheduler_mutex: Arc::new(Mutex::new(())),
180 })
181 }
182
183 pub async fn optimize(&self) -> eyre::Result<()> {
187 self.with_db(|db| {
188 db.execute_batch(
189 r#"--sql
190 PRAGMA optimize;
191 "#,
192 )?;
193 Ok(())
194 })
195 .await
196 }
197
198 pub async fn vacuum(&self) -> eyre::Result<()> {
203 self.with_db(|db| {
204 db.execute_batch(
205 r#"--sql
206 VACUUM;
207 "#,
208 )?;
209 Ok(())
210 })
211 .await
212 }
213
214 async fn with_db_concurrent<F, O>(&self, f: F) -> eyre::Result<O>
217 where
218 F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
219 O: Send + 'static,
220 {
221 let span = tracing::Span::current();
222
223 tracing::trace!("acquiring database connection");
224 let mut conn = self.pool.get().await?;
225 tracing::trace!("acquired database connection");
226
227 tokio::task::spawn_blocking(move || {
228 let _guard = span.enter();
229 f(&mut conn)
230 })
231 .await
232 .unwrap()
233 }
234
235 async fn with_db<F, O>(&self, f: F) -> eyre::Result<O>
241 where
242 F: FnOnce(&mut Connection) -> eyre::Result<O> + Send + 'static,
243 O: Send + 'static,
244 {
245 let scheduler_mutex = self.scheduler_mutex.clone();
246 self.with_db_concurrent(move |conn| {
247 let _guard = scheduler_mutex.lock();
248 f(conn)
249 })
250 .await
251 }
252}
253
254#[async_trait]
255impl Storage for SqliteStorage {
256 #[tracing::instrument(skip_all)]
257 async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
258 if job_types.is_empty() {
259 return Ok(());
260 }
261
262 self.with_db(|db| {
263 let tx = db.transaction()?;
264
265 {
266 let mut insert_stmt = tx.prepare(
267 r#"--sql
268 INSERT INTO ora_job_type (
269 id,
270 name,
271 description,
272 input_schema_json,
273 output_schema_json
274 )
275 VALUES (
276 :id,
277 :name,
278 :description,
279 :input_schema_json,
280 :output_schema_json
281 )
282 ON CONFLICT(id) DO UPDATE SET
283 name = excluded.name,
284 description = excluded.description,
285 input_schema_json = excluded.input_schema_json,
286 output_schema_json = excluded.output_schema_json;
287 "#,
288 )?;
289
290 for job_type in job_types {
291 insert_stmt.execute(params![
292 job_type.id,
293 job_type.name,
294 job_type.description,
295 job_type.input_schema_json,
296 job_type.output_schema_json
297 ])?;
298 }
299 }
300
301 tx.commit()?;
302
303 Ok(())
304 })
305 .await
306 }
307
308 #[tracing::instrument(skip_all)]
309 async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
310 if jobs.is_empty() {
311 return Ok(());
312 }
313
314 self.with_db(|db| {
315 let tx = db.transaction()?;
316 add_jobs(&tx, jobs)?;
317 tx.commit()?;
318
319 Ok(())
320 })
321 .await
322 }
323
324 async fn job_added_conditionally(
325 &self,
326 job: ora_storage::NewJob,
327 filters: ora_storage::JobQueryFilters,
328 ) -> eyre::Result<ora_storage::ConditionalJobResult> {
329 self.with_db(move |db| {
330 let mut tx = db.transaction()?;
331 let res = job_query::job_ids(&mut tx, filters)?;
332
333 if let Some(job_id) = res.first() {
334 tx.commit()?;
335 return Ok(ora_storage::ConditionalJobResult::AlreadyExists { job_id: *job_id });
336 }
337
338 add_jobs(&tx, iter::once(job))?;
339 tx.commit()?;
340
341 Ok(ora_storage::ConditionalJobResult::Added)
342 })
343 .await
344 }
345
346 #[tracing::instrument(skip_all)]
347 async fn jobs_cancelled(
348 &self,
349 job_ids: &[Uuid],
350 timestamp: std::time::SystemTime,
351 ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
352 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
353
354 if job_ids.is_empty() {
355 return Ok(vec![]);
356 }
357
358 let job_ids: Vec<_> = job_ids.into();
359
360 self.with_db(move |db| {
361 let tx = db.transaction()?;
362
363 let cancelled_jobs = if job_ids.len() < TEMP_TABLE_THRESHOLD {
364 let mut update_stmt = tx.prepare_cached(
365 r#"--sql
366 UPDATE ora_job
367 SET
368 cancelled_at_unix_ns = :cancelled_at_unix_ns,
369 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
370 WHERE
371 id = :id
372 AND cancelled_at_unix_ns IS NULL
373 AND marked_unschedulable_at_unix_ns IS NULL
374 RETURNING id, (
375 SELECT id
376 FROM ora_execution
377 WHERE job_id = ora_job.id
378 AND active
379 LIMIT 1
380 ) AS execution_id;
381 "#,
382 )?;
383
384 let mut cancelled_jobs = Vec::new();
385
386 for id in &job_ids {
387 let mut rows = update_stmt.query_map(
388 named_params![
389 ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
390 ":id": id
391 ],
392 |row| {
393 Ok(ora_storage::CancelledJob {
394 id: row.get(0)?,
395 active_execution: row.get(1)?,
396 })
397 },
398 )?;
399
400 if let Some(cancelled_job) = rows.next() {
401 let cancelled_job = cancelled_job?;
402 cancelled_jobs.push(cancelled_job);
403 }
404 }
405
406 cancelled_jobs
407 } else {
408 {
409 tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
410
411 let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
412
413 for id in &job_ids {
414 insert_stmt.execute(params![id])?;
415 }
416 }
417
418 let mut update_stmt = tx.prepare_cached(
419 r#"--sql
420 UPDATE ora_job
421 SET
422 cancelled_at_unix_ns = :cancelled_at_unix_ns,
423 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
424 WHERE
425 id IN (SELECT id FROM temp.job_ids)
426 AND cancelled_at_unix_ns IS NULL
427 AND marked_unschedulable_at_unix_ns IS NULL
428 RETURNING id, (
429 SELECT id
430 FROM ora_execution
431 WHERE job_id = ora_job.id
432 AND active
433 LIMIT 1
434 ) AS execution_id;
435 "#,
436 )?;
437
438 let cancelled_jobs = update_stmt
439 .query_map(
440 named_params![
441 ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
442 ],
443 |row| {
444 Ok(ora_storage::CancelledJob {
445 id: row.get(0)?,
446 active_execution: row.get(1)?,
447 })
448 },
449 )?
450 .collect::<Result<Vec<ora_storage::CancelledJob>, _>>()?;
451
452 tx.execute("DROP TABLE temp.job_ids", [])?;
453
454 cancelled_jobs
455 };
456
457 tx.commit()?;
458
459 Ok(cancelled_jobs)
460 })
461 .await
462 }
463
464 #[tracing::instrument(skip_all)]
465 async fn executions_added(
466 &self,
467 executions: Vec<ora_storage::NewExecution>,
468 timestamp: std::time::SystemTime,
469 ) -> eyre::Result<()> {
470 if executions.is_empty() {
471 return Ok(());
472 }
473
474 self.with_db(move |db| {
475 let tx = db.transaction()?;
476
477 {
478 let mut insert_stmt = tx.prepare_cached(
479 r#"--sql
480 INSERT INTO ora_execution (
481 id,
482 job_id,
483 created_at_unix_ns
484 )
485 VALUES (
486 :id,
487 :job_id,
488 :created_at_unix_ns
489 );
490 "#,
491 )?;
492
493 for execution in executions {
494 insert_stmt.execute(params![
495 execution.id,
496 execution.job_id,
497 SqlSystemTime(timestamp)
498 ])?;
499 }
500 }
501
502 tx.commit()?;
503
504 Ok(())
505 })
506 .await
507 }
508
509 #[tracing::instrument(skip_all)]
510 async fn executions_ready(
511 &self,
512 execution_ids: &[Uuid],
513 timestamp: std::time::SystemTime,
514 ) -> eyre::Result<()> {
515 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
516
517 if execution_ids.is_empty() {
518 return Ok(());
519 }
520
521 let execution_ids: Vec<_> = execution_ids.into();
522
523 self.with_db(move |db| {
524 let tx = db.transaction()?;
525
526 if execution_ids.len() < TEMP_TABLE_THRESHOLD {
527 let mut update_stmt = tx.prepare_cached(
528 r#"--sql
529 UPDATE ora_execution
530 SET ready_at_unix_ns = :ready_at_unix_ns
531 WHERE id = :id;
532 "#,
533 )?;
534
535 for id in &execution_ids {
536 update_stmt.execute(params![SqlSystemTime(timestamp), id])?;
537 }
538 } else {
539 {
540 tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
541
542 let mut insert_stmt =
543 tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
544
545 for id in &execution_ids {
546 insert_stmt.execute(params![id])?;
547 }
548 }
549
550 let mut update_stmt = tx.prepare_cached(
551 r#"--sql
552 UPDATE ora_execution
553 SET ready_at_unix_ns = :ready_at_unix_ns
554 WHERE
555 id IN (SELECT id FROM temp.execution_ids)
556 AND ready_at_unix_ns IS NULL;
557 "#,
558 )?;
559
560 update_stmt.execute(params![SqlSystemTime(timestamp)])?;
561
562 tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
563 }
564
565 tx.commit()?;
566
567 Ok(())
568 })
569 .await
570 }
571
572 #[tracing::instrument(skip_all)]
573 async fn execution_assigned(
574 &self,
575 execution_id: Uuid,
576 executor_id: Uuid,
577 timestamp: std::time::SystemTime,
578 ) -> eyre::Result<()> {
579 self.with_db(move |db| {
580 let tx = db.transaction()?;
581
582 {
583 let mut update_stmt = tx.prepare_cached(
584 r#"--sql
585 UPDATE ora_execution
586 SET executor_id = :executor_id,
587 assigned_at_unix_ns = :assigned_at_unix_ns
588 WHERE id = :id;
589 "#,
590 )?;
591
592 update_stmt.execute(named_params![
593 ":executor_id": executor_id,
594 ":assigned_at_unix_ns": SqlSystemTime(timestamp),
595 ":id": execution_id
596 ])?;
597 }
598
599 tx.commit()?;
600
601 Ok(())
602 })
603 .await
604 }
605
606 #[tracing::instrument(skip_all)]
607 async fn execution_started(
608 &self,
609 execution_id: Uuid,
610 timestamp: std::time::SystemTime,
611 ) -> eyre::Result<()> {
612 self.with_db(move |db| {
613 let tx = db.transaction()?;
614
615 {
616 let mut update_stmt = tx.prepare_cached(
617 r#"--sql
618 UPDATE ora_execution
619 SET started_at_unix_ns = :started_at_unix_ns
620 WHERE id = :id;
621 "#,
622 )?;
623
624 update_stmt.execute(named_params! {
625 ":started_at_unix_ns": SqlSystemTime(timestamp),
626 ":id": execution_id
627 })?;
628 }
629
630 tx.commit()?;
631
632 Ok(())
633 })
634 .await
635 }
636
637 #[tracing::instrument(skip_all)]
638 async fn execution_succeeded(
639 &self,
640 execution_id: Uuid,
641 timestamp: std::time::SystemTime,
642 output_payload_json: String,
643 ) -> eyre::Result<()> {
644 self.with_db(move |db| {
645 let tx = db.transaction()?;
646
647 {
648 let mut update_stmt = tx.prepare_cached(
649 r#"--sql
650 UPDATE ora_execution
651 SET succeeded_at_unix_ns = :succeeded_at_unix_ns,
652 output_payload_json = :output_payload_json
653 WHERE id = :id;
654 "#,
655 )?;
656
657 update_stmt.execute(named_params![
658 ":succeeded_at_unix_ns": SqlSystemTime(timestamp),
659 ":output_payload_json": output_payload_json,
660 ":id": execution_id
661 ])?;
662 }
663
664 {
665 let mut update_stmt = tx.prepare_cached(
666 r#"--sql
667 UPDATE ora_job
668 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
669 WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
670 "#,
671 )?;
672
673 update_stmt.execute(named_params! {
674 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
675 ":id": execution_id
676 })?;
677 }
678 tx.commit()?;
679
680 Ok(())
681 })
682 .await
683 }
684
685 #[tracing::instrument(skip_all)]
686 async fn executions_failed(
687 &self,
688 execution_ids: &[Uuid],
689 timestamp: std::time::SystemTime,
690 reason: String,
691 mark_job_unschedulable: bool,
692 ) -> eyre::Result<()> {
693 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
694
695 if execution_ids.is_empty() {
696 return Ok(());
697 }
698
699 let execution_ids: Vec<_> = execution_ids.into();
700
701 self.with_db(move |db| {
702 let tx = db.transaction()?;
703
704 if execution_ids.len() < TEMP_TABLE_THRESHOLD {
705 let mut update_stmt = tx.prepare_cached(
706 r#"--sql
707 UPDATE ora_execution
708 SET failed_at_unix_ns = :failed_at_unix_ns,
709 failure_reason = :failure_reason
710 WHERE id = :id;
711 "#,
712 )?;
713
714 for id in &execution_ids {
715 update_stmt.execute(named_params![
716 ":failed_at_unix_ns": SqlSystemTime(timestamp),
717 ":failure_reason": reason,
718 ":id": id
719 ])?;
720 }
721
722 if mark_job_unschedulable {
723 let mut update_stmt = tx.prepare_cached(
724 r#"--sql
725 UPDATE ora_job
726 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
727 WHERE id = (SELECT job_id FROM ora_execution WHERE id = :id);
728 "#,
729 )?;
730
731 for id in &execution_ids {
732 update_stmt.execute(named_params! {
733 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
734 ":id": id
735 })?;
736 }
737 }
738 } else {
739 {
740 tx.execute("CREATE TABLE temp.execution_ids (id BLOB)", [])?;
741
742 let mut insert_stmt =
743 tx.prepare("INSERT INTO temp.execution_ids (id) VALUES (?)")?;
744
745 for id in &execution_ids {
746 insert_stmt.execute(params![id])?;
747 }
748 }
749
750 let mut update_stmt = tx.prepare_cached(
751 r#"--sql
752 UPDATE ora_execution
753 SET failed_at_unix_ns = :failed_at_unix_ns,
754 failure_reason = :failure_reason
755 WHERE
756 id IN (SELECT id FROM temp.execution_ids)
757 AND failed_at_unix_ns IS NULL;
758 "#,
759 )?;
760
761 update_stmt.execute(named_params![
762 ":failed_at_unix_ns": SqlSystemTime(timestamp),
763 ":failure_reason": reason
764 ])?;
765
766 if mark_job_unschedulable {
767 let mut update_stmt = tx.prepare_cached(
768 r#"--sql
769 UPDATE ora_job
770 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
771 WHERE id IN (SELECT job_id FROM ora_execution WHERE id IN (SELECT id FROM temp.execution_ids));
772 "#,
773 )?;
774
775 update_stmt.execute(named_params! {
776 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
777 })?;
778 }
779 }
780
781
782
783 tx.execute("DROP TABLE IF EXISTS temp.execution_ids", [])?;
784
785 tx.commit()?;
786
787 Ok(())
788 })
789 .await
790 }
791
792 #[tracing::instrument(skip_all)]
793 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
794 let executor_ids: Vec<_> = executor_ids.into();
795
796 self.with_db(move |db| {
797 if executor_ids.is_empty() {
798 let mut stmt = db.prepare(
799 r#"--sql
800 SELECT id
801 FROM ora_execution
802 WHERE
803 executor_id IS NOT NULL
804 AND (
805 succeeded_at_unix_ns IS NULL
806 AND failed_at_unix_ns IS NULL
807 );
808 "#,
809 )?;
810
811 let ids = stmt
812 .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
813 .collect::<Result<Vec<Uuid>, _>>()?;
814
815 Ok(ids)
816 } else {
817 let id_params_sql = "?,".repeat(executor_ids.len());
818 let id_params_sql = &id_params_sql[..id_params_sql.len() - 1];
819
820 let mut stmt = db.prepare(&format!(
821 r#"--sql
822 SELECT id
823 FROM ora_execution
824 WHERE
825 executor_id NOT IN ({id_params_sql})
826 AND (
827 succeeded_at_unix_ns IS NULL
828 AND failed_at_unix_ns IS NULL
829 );
830 "#
831 ))?;
832
833 let ids = stmt
834 .query_map(params_from_iter(&executor_ids), |row| row.get(0))?
835 .collect::<Result<Vec<Uuid>, _>>()?;
836
837 Ok(ids)
838 }
839 })
840 .await
841 }
842
843 #[tracing::instrument(skip_all)]
844 async fn jobs_unschedulable(
845 &self,
846 job_ids: &[Uuid],
847 timestamp: std::time::SystemTime,
848 ) -> eyre::Result<()> {
849 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
850
851 if job_ids.is_empty() {
852 return Ok(());
853 }
854
855 let job_ids: Vec<_> = job_ids.into();
856
857 self.with_db(move |db| {
858 let tx = db.transaction()?;
859
860 if job_ids.len() < TEMP_TABLE_THRESHOLD {
861 let mut update_stmt = tx.prepare_cached(
862 r#"--sql
863 UPDATE ora_job
864 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
865 WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
866 "#,
867 )?;
868
869 for id in &job_ids {
870 update_stmt.execute(named_params![
871 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
872 ":id": id
873 ])?;
874 }
875 } else {
876 {
877 tx.execute("CREATE TABLE temp.job_ids (id BLOB)", [])?;
878
879 let mut insert_stmt = tx.prepare("INSERT INTO temp.job_ids (id) VALUES (?)")?;
880
881 for id in &job_ids {
882 insert_stmt.execute(params![id])?;
883 }
884 }
885
886 let mut update_stmt = tx.prepare_cached(
887 r#"--sql
888 UPDATE ora_job
889 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
890 WHERE
891 id IN (SELECT id FROM temp.job_ids)
892 AND marked_unschedulable_at_unix_ns IS NULL;
893 "#,
894 )?;
895
896 update_stmt.execute(named_params![
897 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
898 ])?;
899
900 tx.execute("DROP TABLE temp.job_ids", [])?;
901 }
902
903 tx.commit()?;
904
905 Ok(())
906 })
907 .await
908 }
909
910 #[tracing::instrument(skip_all)]
911 async fn pending_executions(
912 &self,
913 after: Option<Uuid>,
914 ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
915 self.with_db(move |db| {
916 let mut stmt = db.prepare_cached(
917 r#"--sql
918 SELECT
919 e.id,
920 j.target_execution_time_unix_ns
921 FROM ora_execution e
922 JOIN
923 ora_job j
924 ON
925 e.job_id = j.id
926 WHERE
927 e.ready_at_unix_ns IS NULL
928 AND (? IS NULL OR e.id > ?)
929 LIMIT 10000;
930 "#,
931 )?;
932
933 let pending_executions = stmt
934 .query_map(params![after, after], |row| {
935 Ok(ora_storage::PendingExecution {
936 id: row.get(0)?,
937 target_execution_time: row.get::<_, SqlSystemTime>(1)?.into(),
938 })
939 })?
940 .collect::<Result<Vec<_>, _>>()?;
941
942 Ok(pending_executions)
943 })
944 .await
945 }
946
947 #[tracing::instrument(skip_all)]
948 async fn ready_executions(
949 &self,
950 after: Option<Uuid>,
951 ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
952 self.with_db(move |db| {
953 let mut stmt = db.prepare_cached(
954 r#"--sql
955 SELECT
956 e.id,
957 e.job_id,
958 j.input_payload_json,
959 es.execution_count AS attempt_number,
960 j.job_type_id,
961 j.target_execution_time_unix_ns,
962 j.timeout_policy
963 FROM ora_execution e
964 JOIN ora_job j ON
965 j.id = e.job_id
966 JOIN ora_job_execution_state es ON
967 j.id = es.job_id
968 WHERE
969 ready_at_unix_ns IS NOT NULL
970 AND assigned_at_unix_ns IS NULL
971 AND started_at_unix_ns IS NULL
972 AND failed_at_unix_ns IS NULL
973 AND succeeded_at_unix_ns IS NULL
974 AND (? IS NULL OR e.id > ?)
975 ORDER BY e.id ASC
976 LIMIT 10000;
977 "#,
978 )?;
979
980 let mut rows = stmt.query([after, after])?;
981
982 let mut ready_executions = Vec::new();
983
984 while let Some(row) = rows.next()? {
985 ready_executions.push(ora_storage::ReadyExecution {
986 id: row.get(0)?,
987 job_id: row.get(1)?,
988 input_payload_json: row.get(2)?,
989 attempt_number: row.get(3)?,
990 job_type_id: row.get(4)?,
991 target_execution_time: row.get::<_, SqlSystemTime>(5)?.into(),
992 timeout_policy: row.get::<_, JobTimeoutPolicy>(6)?.into(),
993 });
994 }
995
996 Ok(ready_executions)
997 })
998 .await
999 }
1000
1001 #[tracing::instrument(skip_all)]
1002 async fn pending_jobs(
1003 &self,
1004 after: Option<Uuid>,
1005 ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
1006 self.with_db(move |db| {
1007 let mut stmt = db.prepare_cached(
1008 r#"--sql
1009 SELECT
1010 ora_job.id,
1011 target_execution_time_unix_ns,
1012 es.execution_count,
1013 retry_policy,
1014 timeout_policy
1015 FROM ora_job
1016 JOIN ora_job_execution_state es ON
1017 ora_job.id = es.job_id
1018 WHERE
1019 marked_unschedulable_at_unix_ns IS NULL
1020 AND es.active_execution_id IS NULL
1021 AND (? IS NULL OR id > ?)
1022 ORDER BY ora_job.id ASC
1023 LIMIT 10000;
1024 "#,
1025 )?;
1026
1027 let mut rows = stmt.query([after, after])?;
1028
1029 let mut pending_jobs = Vec::new();
1030
1031 while let Some(row) = rows.next()? {
1032 pending_jobs.push(ora_storage::PendingJob {
1033 id: row.get(0)?,
1034 target_execution_time: row.get::<_, SqlSystemTime>(1)?.0,
1035 execution_count: row.get(2)?,
1036 retry_policy: row.get::<_, JobRetryPolicy>(3)?.into(),
1037 timeout_policy: row.get::<_, JobTimeoutPolicy>(4)?.into(),
1038 });
1039 }
1040
1041 Ok(pending_jobs)
1042 })
1043 .await
1044 }
1045
1046 #[tracing::instrument(skip_all)]
1047 async fn query_jobs(
1048 &self,
1049 cursor: Option<String>,
1050 limit: usize,
1051 order: ora_storage::JobQueryOrder,
1052 filters: ora_storage::JobQueryFilters,
1053 ) -> eyre::Result<ora_storage::JobQueryResult> {
1054 let cursor: Option<job_query::Cursor> = match cursor {
1055 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1056 None => None,
1057 };
1058
1059 self.with_db_concurrent(move |db| {
1060 let mut tx = db.transaction()?;
1061 let res = job_query::query_job_details(&mut tx, cursor, limit, order, filters);
1062 tx.commit()?;
1063 res
1064 })
1065 .await
1066 }
1067
1068 #[tracing::instrument(skip_all)]
1069 async fn query_job_ids(
1070 &self,
1071 filters: ora_storage::JobQueryFilters,
1072 ) -> eyre::Result<Vec<Uuid>> {
1073 self.with_db_concurrent(|db| {
1074 let mut tx = db.transaction()?;
1075 let res = job_query::job_ids(&mut tx, filters);
1076 tx.commit()?;
1077 res
1078 })
1079 .await
1080 }
1081
1082 #[tracing::instrument(skip_all)]
1083 async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
1084 self.with_db_concurrent(|db| {
1085 let mut tx = db.transaction()?;
1086 let res = job_query::count_jobs(&mut tx, filters);
1087 tx.commit()?;
1088 res
1089 })
1090 .await
1091 }
1092
1093 #[tracing::instrument(skip_all)]
1094 async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
1095 self.with_db_concurrent(|db| {
1096 let mut stmt = db.prepare_cached(
1097 r#"--sql
1098 SELECT
1099 id,
1100 name,
1101 description,
1102 input_schema_json,
1103 output_schema_json
1104 FROM ora_job_type;
1105 "#,
1106 )?;
1107
1108 let job_types = stmt
1109 .query_map([], |row| {
1110 Ok(ora_storage::JobType {
1111 id: row.get(0)?,
1112 name: row.get(1)?,
1113 description: row.get(2)?,
1114 input_schema_json: row.get(3)?,
1115 output_schema_json: row.get(4)?,
1116 })
1117 })?
1118 .collect::<Result<Vec<_>, _>>()?;
1119
1120 Ok(job_types)
1121 })
1122 .await
1123 }
1124
1125 #[tracing::instrument(skip_all)]
1126 async fn delete_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
1127 self.with_db(|db| {
1128 let mut tx = db.transaction()?;
1129 let res = job_query::delete_jobs(&mut tx, filters);
1130 tx.commit()?;
1131 res
1132 })
1133 .await
1134 }
1135
1136 #[tracing::instrument(skip_all)]
1137 async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
1138 if schedules.is_empty() {
1139 return Ok(());
1140 }
1141
1142 self.with_db(|db| {
1143 let tx = db.transaction()?;
1144 add_schedules(&tx, schedules)?;
1145 tx.commit()?;
1146
1147 Ok(())
1148 })
1149 .await
1150 }
1151
1152 async fn schedule_added_conditionally(
1153 &self,
1154 schedule: ora_storage::NewSchedule,
1155 filter: ora_storage::ScheduleQueryFilters,
1156 ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
1157 self.with_db(move |db| {
1158 let mut tx = db.transaction()?;
1159 let res = schedule_query::schedule_ids(&mut tx, filter)?;
1160
1161 if let Some(schedule_id) = res.first() {
1162 tx.commit()?;
1163 return Ok(ora_storage::ConditionalScheduleResult::AlreadyExists {
1164 schedule_id: *schedule_id,
1165 });
1166 }
1167
1168 add_schedules(&tx, iter::once(schedule))?;
1169 tx.commit()?;
1170
1171 Ok(ora_storage::ConditionalScheduleResult::Added)
1172 })
1173 .await
1174 }
1175
1176 #[tracing::instrument(skip_all)]
1177 async fn schedules_cancelled(
1178 &self,
1179 schedule_ids: &[Uuid],
1180 timestamp: std::time::SystemTime,
1181 ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
1182 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1183
1184 if schedule_ids.is_empty() {
1185 return Ok(vec![]);
1186 }
1187
1188 let schedule_ids: Vec<_> = schedule_ids.into();
1189
1190 self.with_db(move |db| {
1191 let tx = db.transaction()?;
1192
1193 let cancelled_schedules = if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1194 let mut update_stmt = tx.prepare_cached(
1195 r#"--sql
1196 UPDATE ora_schedule
1197 SET
1198 cancelled_at_unix_ns = :cancelled_at_unix_ns,
1199 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1200 WHERE
1201 id = :id
1202 AND cancelled_at_unix_ns IS NULL
1203 AND marked_unschedulable_at_unix_ns IS NULL
1204 RETURNING id;
1205 "#,
1206 )?;
1207
1208 let mut cancelled_schedules = Vec::new();
1209
1210 for id in &schedule_ids {
1211 let mut rows = update_stmt.query_map(
1212 named_params![
1213 ":cancelled_at_unix_ns": SqlSystemTime(timestamp),
1214 ":id": id
1215 ],
1216 |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1217 )?;
1218
1219 if let Some(cancelled_schedule) = rows.next() {
1220 let cancelled_schedule = cancelled_schedule?;
1221 cancelled_schedules.push(cancelled_schedule);
1222 }
1223 }
1224
1225 cancelled_schedules
1226 } else {
1227 {
1228 tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1229
1230 let mut insert_stmt =
1231 tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1232
1233 for id in &schedule_ids {
1234 insert_stmt.execute(params![id])?;
1235 }
1236 }
1237
1238 let mut update_stmt = tx.prepare_cached(
1239 r#"--sql
1240 UPDATE ora_schedule
1241 SET
1242 cancelled_at_unix_ns = :cancelled_at_unix_ns,
1243 marked_unschedulable_at_unix_ns = :cancelled_at_unix_ns
1244 WHERE
1245 id IN (SELECT id FROM temp.schedule_ids)
1246 AND cancelled_at_unix_ns IS NULL
1247 AND marked_unschedulable_at_unix_ns IS NULL
1248 RETURNING id;
1249 "#,
1250 )?;
1251
1252 let cancelled_schedules = update_stmt
1253 .query_map(
1254 named_params![
1255 ":cancelled_at_unix_ns": SqlSystemTime(timestamp)
1256 ],
1257 |row| Ok(ora_storage::CancelledSchedule { id: row.get(0)? }),
1258 )?
1259 .collect::<Result<Vec<_>, _>>()?;
1260
1261 tx.execute("DROP TABLE temp.schedule_ids", [])?;
1262
1263 cancelled_schedules
1264 };
1265
1266 tx.commit()?;
1267
1268 Ok(cancelled_schedules)
1269 })
1270 .await
1271 }
1272
1273 #[tracing::instrument(skip_all)]
1274 async fn schedules_unschedulable(
1275 &self,
1276 schedule_ids: &[Uuid],
1277 timestamp: std::time::SystemTime,
1278 ) -> eyre::Result<()> {
1279 const TEMP_TABLE_THRESHOLD: usize = DEFAULT_TEMP_TABLE_THRESHOLD;
1280
1281 if schedule_ids.is_empty() {
1282 return Ok(());
1283 }
1284
1285 let schedule_ids: Vec<_> = schedule_ids.into();
1286
1287 self.with_db(move |db| {
1288 let tx = db.transaction()?;
1289
1290 if schedule_ids.len() < TEMP_TABLE_THRESHOLD {
1291 let mut update_stmt = tx.prepare_cached(
1292 r#"--sql
1293 UPDATE ora_schedule
1294 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1295 WHERE id = :id AND marked_unschedulable_at_unix_ns IS NULL;
1296 "#,
1297 )?;
1298
1299 for id in &schedule_ids {
1300 update_stmt.execute(named_params![
1301 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp),
1302 ":id": id
1303 ])?;
1304 }
1305 } else {
1306 {
1307 tx.execute("CREATE TABLE temp.schedule_ids (id BLOB)", [])?;
1308
1309 let mut insert_stmt =
1310 tx.prepare("INSERT INTO temp.schedule_ids (id) VALUES (?)")?;
1311
1312 for id in &schedule_ids {
1313 insert_stmt.execute(params![id])?;
1314 }
1315 }
1316
1317 let mut update_stmt = tx.prepare_cached(
1318 r#"--sql
1319 UPDATE ora_schedule
1320 SET marked_unschedulable_at_unix_ns = :marked_unschedulable_at_unix_ns
1321 WHERE
1322 id IN (SELECT id FROM temp.schedule_ids)
1323 AND marked_unschedulable_at_unix_ns IS NULL;
1324 "#,
1325 )?;
1326
1327 update_stmt.execute(named_params![
1328 ":marked_unschedulable_at_unix_ns": SqlSystemTime(timestamp)
1329 ])?;
1330
1331 tx.execute("DROP TABLE temp.schedule_ids", [])?;
1332 }
1333
1334 tx.commit()?;
1335
1336 Ok(())
1337 })
1338 .await
1339 }
1340
1341 #[tracing::instrument(skip_all)]
1342 async fn pending_schedules(
1343 &self,
1344 after: Option<Uuid>,
1345 ) -> eyre::Result<Vec<ora_storage::PendingSchedule>> {
1346 self.with_db(move |db| {
1347 let mut stmt = db.prepare_cached(
1348 r#"--sql
1349 SELECT
1350 id,
1351 job_timing_policy,
1352 job_creation_policy,
1353 last_target_execution_time_unix_ns,
1354 start_after_unix_ns,
1355 end_before_unix_ns
1356 FROM ora_schedule_job_state
1357 JOIN ora_schedule ON
1358 ora_schedule_job_state.schedule_id = ora_schedule.id
1359 WHERE
1360 marked_unschedulable_at_unix_ns IS NULL
1361 AND active_job_id IS NULL
1362 AND (? IS NULL OR id > ?)
1363 ORDER BY id ASC
1364 LIMIT 10000;
1365 "#,
1366 )?;
1367
1368 let mut rows = stmt.query([after, after])?;
1369
1370 let mut pending_schedules = Vec::new();
1371
1372 while let Some(row) = rows.next()? {
1373 pending_schedules.push(ora_storage::PendingSchedule {
1374 id: row.get(0)?,
1375 job_timing_policy: row
1376 .get::<_, crate::models::ScheduleJobTimingPolicy>(1)?
1377 .into(),
1378 job_creation_policy: row
1379 .get::<_, crate::models::ScheduleJobCreationPolicy>(2)?
1380 .into(),
1381 last_target_execution_time: row
1382 .get::<_, Option<SqlSystemTime>>(3)?
1383 .map(Into::into),
1384 time_range: Some(ScheduleTimeRange {
1385 start: row.get::<_, Option<SqlSystemTime>>(4)?.map(Into::into),
1386 end: row.get::<_, Option<SqlSystemTime>>(5)?.map(Into::into),
1387 }),
1388 });
1389 }
1390
1391 Ok(pending_schedules)
1392 })
1393 .await
1394 }
1395
1396 #[tracing::instrument(skip_all)]
1397 async fn query_schedules(
1398 &self,
1399 cursor: Option<String>,
1400 limit: usize,
1401 filters: ora_storage::ScheduleQueryFilters,
1402 order: ora_storage::ScheduleQueryOrder,
1403 ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
1404 let cursor: Option<schedule_query::Cursor> = match cursor {
1405 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
1406 None => None,
1407 };
1408
1409 self.with_db_concurrent(move |db| {
1410 let mut tx = db.transaction()?;
1411 let res =
1412 schedule_query::query_schedule_details(&mut tx, cursor, limit, order, filters);
1413 tx.commit()?;
1414 res
1415 })
1416 .await
1417 }
1418
1419 #[tracing::instrument(skip_all)]
1420 async fn query_schedule_ids(
1421 &self,
1422 filters: ora_storage::ScheduleQueryFilters,
1423 ) -> eyre::Result<Vec<Uuid>> {
1424 self.with_db_concurrent(|db| {
1425 let mut tx = db.transaction()?;
1426 let res = schedule_query::schedule_ids(&mut tx, filters);
1427 tx.commit()?;
1428 res
1429 })
1430 .await
1431 }
1432
1433 #[tracing::instrument(skip_all)]
1434 async fn count_schedules(
1435 &self,
1436 filters: ora_storage::ScheduleQueryFilters,
1437 ) -> eyre::Result<u64> {
1438 self.with_db_concurrent(|db| {
1439 let mut tx = db.transaction()?;
1440 let res = schedule_query::count_schedules(&mut tx, filters);
1441 tx.commit()?;
1442 res
1443 })
1444 .await
1445 }
1446
1447 #[tracing::instrument(skip_all)]
1448 async fn delete_schedules(
1449 &self,
1450 filters: ora_storage::ScheduleQueryFilters,
1451 ) -> eyre::Result<Vec<Uuid>> {
1452 self.with_db(|db| {
1453 let mut tx = db.transaction()?;
1454 let res = schedule_query::delete_schedules(&mut tx, filters);
1455 tx.commit()?;
1456 res
1457 })
1458 .await
1459 }
1460}
1461
1462fn add_jobs(
1463 tx: &rusqlite::Transaction,
1464 jobs: impl IntoIterator<Item = NewJob>,
1465) -> eyre::Result<()> {
1466 {
1467 let mut insert_stmt = tx.prepare_cached(
1468 r#"--sql
1469 INSERT INTO ora_job (
1470 id,
1471 schedule_id,
1472 created_at_unix_ns,
1473 job_type_id,
1474 target_execution_time_unix_ns,
1475 retry_policy,
1476 timeout_policy,
1477 input_payload_json,
1478 metadata_json
1479 )
1480 VALUES (
1481 :id,
1482 :schedule_id,
1483 :created_at_unix_ns,
1484 :job_type_id,
1485 :target_execution_time_unix_ns,
1486 :retry_policy,
1487 :timeout_policy,
1488 :input_payload_json,
1489 :metadata_json
1490 );
1491 "#,
1492 )?;
1493
1494 let mut insert_label_stmt = tx.prepare_cached(
1495 r#"--sql
1496 INSERT INTO ora_job_label (
1497 job_id,
1498 key,
1499 value
1500 )
1501 VALUES (
1502 :job_id,
1503 :key,
1504 :value
1505 );
1506 "#,
1507 )?;
1508
1509 for job in jobs {
1510 insert_stmt.execute(params![
1511 job.id,
1512 job.schedule_id,
1513 SqlSystemTime(job.created_at),
1514 job.job_type_id,
1515 SqlSystemTime(job.target_execution_time),
1516 JobRetryPolicy::from(job.retry_policy),
1517 JobTimeoutPolicy::from(job.timeout_policy),
1518 job.input_payload_json,
1519 job.metadata_json
1520 ])?;
1521
1522 for (key, value) in job.labels {
1523 insert_label_stmt.execute(params![job.id, key, value])?;
1524 }
1525 }
1526 }
1527 Ok(())
1528}
1529
1530fn add_schedules(
1531 tx: &rusqlite::Transaction,
1532 schedules: impl IntoIterator<Item = NewSchedule>,
1533) -> eyre::Result<()> {
1534 let mut insert_stmt = tx.prepare_cached(
1535 r#"--sql
1536 INSERT INTO ora_schedule (
1537 id,
1538 created_at_unix_ns,
1539 job_type_id,
1540 job_timing_policy,
1541 job_creation_policy,
1542 start_after_unix_ns,
1543 end_before_unix_ns,
1544 metadata_json
1545 )
1546 VALUES (
1547 :id,
1548 :created_at_unix_ns,
1549 :job_type_id,
1550 :job_timing_policy,
1551 :job_creation_policy,
1552 :start_after_unix_ns,
1553 :end_before_unix_ns,
1554 :metadata_json
1555 );
1556 "#,
1557 )?;
1558
1559 for schedule in schedules {
1560 let job_type_id = match &schedule.job_creation_policy {
1561 ora_storage::ScheduleJobCreationPolicy::JobDefinition(schedule_new_job_definition) => {
1562 schedule_new_job_definition.job_type_id.clone()
1563 }
1564 };
1565
1566 let start_after = schedule.time_range.as_ref().and_then(|r| r.start);
1567 let end_before = schedule.time_range.as_ref().and_then(|r| r.end);
1568
1569 insert_stmt.execute(params![
1570 schedule.id,
1571 SqlSystemTime(schedule.created_at),
1572 job_type_id,
1573 crate::models::ScheduleJobTimingPolicy::from(schedule.job_timing_policy),
1574 crate::models::ScheduleJobCreationPolicy::from(schedule.job_creation_policy),
1575 start_after.map(SqlSystemTime),
1576 end_before.map(SqlSystemTime),
1577 schedule.metadata_json
1578 ])?;
1579
1580 let mut insert_label_stmt = tx.prepare_cached(
1581 r#"--sql
1582 INSERT INTO ora_schedule_label (
1583 schedule_id,
1584 key,
1585 value
1586 )
1587 VALUES (
1588 :schedule_id,
1589 :key,
1590 :value
1591 );
1592 "#,
1593 )?;
1594
1595 for (key, value) in schedule.labels {
1596 insert_label_stmt.execute(params![schedule.id, key, value])?;
1597 }
1598 }
1599
1600 Ok(())
1601}