1use std::{mem, time::SystemTime};
4
5use async_trait::async_trait;
6use eyre::{bail, Context};
7use fjall::{PersistMode, ReadTransaction, TxKeyspace, WriteTransaction};
8use models::{ExecutionData, JobData, JobTypeData, ScheduleData};
9use ora_storage::IndexSet;
10use ora_storage::{PendingSchedule, Storage};
11use partitions::Partitions;
12use tokio::task::spawn_blocking;
13use typed::Raw;
14use util::deserialize_systemtime;
15use uuid::Uuid;
16
17#[macro_use]
18pub(crate) mod util;
19
20mod indexes;
21mod job_query;
22mod models;
23mod partitions;
24mod schedule_query;
25mod snapshot;
26mod typed;
27
28#[must_use]
30pub struct FjallStorageConfig {
31 fjall_config: fjall::Config,
32 durability: Option<PersistMode>,
33}
34
35impl FjallStorageConfig {
36 pub fn new(fjall_config: fjall::Config) -> Self {
38 Self {
39 fjall_config,
40 durability: Some(PersistMode::SyncAll),
41 }
42 }
43
44 pub fn transcation_durability(mut self, durability: Option<PersistMode>) -> Self {
46 self.durability = durability;
47 self
48 }
49}
50
51#[derive(Clone)]
53pub struct FjallStorage {
54 keyspace: TxKeyspace,
55 durability: Option<PersistMode>,
57 partitions: Partitions,
58}
59
60impl std::fmt::Debug for FjallStorage {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("FjallStorage").finish_non_exhaustive()
63 }
64}
65
66impl FjallStorage {
67 pub fn new(config: FjallStorageConfig) -> eyre::Result<Self> {
69 let keyspace = config.fjall_config.open_transactional()?;
70
71 let partitions = Partitions::new(&keyspace)?;
72
73 Ok(Self {
74 partitions,
75 keyspace,
76 durability: config.durability,
77 })
78 }
79
80 async fn write<F, T>(&self, f: F) -> eyre::Result<T>
81 where
82 F: for<'a> FnOnce(WriteTransaction<'a>, &'a Partitions) -> eyre::Result<T> + Send + 'static,
83 T: Send + 'static,
84 {
85 let keyspace = self.keyspace.clone();
86 let partitions = self.partitions.clone();
87 let durability = self.durability;
88
89 spawn_blocking(move || f(keyspace.write_tx().durability(durability), &partitions))
90 .await
91 .map_err(eyre::Report::from)?
92 }
93
94 async fn read<F, T>(&self, f: F) -> eyre::Result<T>
95 where
96 F: for<'a> FnOnce(ReadTransaction, &'a Partitions) -> eyre::Result<T> + Send + 'static,
97 T: Send + 'static,
98 {
99 let keyspace = self.keyspace.clone();
100 let partitions = self.partitions.clone();
101
102 spawn_blocking(move || f(keyspace.read_tx(), &partitions))
103 .await
104 .map_err(eyre::Report::from)?
105 }
106}
107
108#[async_trait]
109impl Storage for FjallStorage {
110 async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
111 self.write(|mut tx, partitions| {
112 for mut job_type in job_types {
113 let job_type_id = mem::take(&mut job_type.id);
114 let job_type_data = JobTypeData::from(job_type);
115
116 partitions
117 .job_types
118 .write(&mut tx)
119 .insert(&job_type_id, &job_type_data);
120 }
121
122 tx.commit()?;
123
124 Ok(())
125 })
126 .await
127 }
128
129 async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
130 self.write(|mut tx, partitions| {
131 for job in jobs {
132 let job_data = JobData::from(job);
133
134 partitions
135 .active_jobs
136 .write(&mut tx)
137 .insert(&job_data.id, &job_data);
138
139 for (key, value) in job_data.labels {
140 let label_key = indexes::LabelIndexKey::new(&key, &value, job_data.id);
141 partitions
142 .idx_job_labels
143 .write(&mut tx)
144 .insert(&label_key, &());
145 }
146
147 partitions
148 .idx_pending_jobs
149 .write(&mut tx)
150 .insert(&job_data.id, &());
151
152 if let Some(schedule_id) = job_data.schedule_id {
153 let schedule_job_key =
154 indexes::ScheduleJobIndexKey::new(schedule_id, job_data.id);
155 partitions
156 .idx_schedule_jobs
157 .write(&mut tx)
158 .insert(&schedule_job_key, &());
159 partitions
160 .idx_schedule_active_job
161 .write(&mut tx)
162 .insert(&schedule_id, &job_data.id);
163 partitions
164 .idx_job_schedule
165 .write(&mut tx)
166 .insert(&job_data.id, &schedule_id);
167 partitions
168 .idx_pending_schedules
169 .write(&mut tx)
170 .remove(&schedule_id);
171 }
172 }
173
174 tx.commit()?;
175
176 Ok(())
177 })
178 .await
179 }
180
181 async fn jobs_cancelled(
182 &self,
183 job_ids: &[Uuid],
184 timestamp: SystemTime,
185 ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
186 let mut job_ids: Vec<_> = job_ids.into();
187 job_ids.sort();
188
189 self.write(move |mut tx, partitions| {
190 let mut cancelled_jobs = Vec::with_capacity(job_ids.len());
191
192 for job_id in job_ids {
193 if let Some(job_data) = partitions.active_jobs.write(&mut tx).get(&job_id)? {
194 let job_data = job_data.value();
195
196 if job_data.cancelled_at.is_some() {
197 continue;
198 }
199
200 let mut job_data = deserialize!(job_data)?;
201 job_data.cancelled_at = Some(timestamp);
202
203 partitions
204 .active_jobs
205 .write(&mut tx)
206 .insert(&job_id, &job_data);
207 job_unschedulable(partitions, &mut tx, job_id, timestamp)?;
208
209 let active_execution = partitions
210 .idx_job_active_execution
211 .write(&mut tx)
212 .get(&job_id)?
213 .map(|id| id.value());
214
215 cancelled_jobs.push(ora_storage::CancelledJob {
216 id: job_id,
217 active_execution,
218 });
219 }
220 }
221
222 tx.commit()?;
223
224 Ok(cancelled_jobs)
225 })
226 .await
227 }
228
229 async fn executions_added(
230 &self,
231 executions: Vec<ora_storage::NewExecution>,
232 timestamp: SystemTime,
233 ) -> eyre::Result<()> {
234 self.write(move |mut tx, partitions| {
235 for execution in executions {
236 let Some(job) = partitions
237 .active_jobs
238 .write(&mut tx)
239 .get(&execution.job_id)?
240 else {
241 continue;
242 };
243
244 let target_execution_time =
245 deserialize_systemtime(job.value().target_execution_time);
246
247 let execution_data = ExecutionData {
248 id: execution.id,
249 job_id: execution.job_id,
250 executor_id: None,
251 created_at: timestamp,
252 ready_at: None,
253 assigned_at: None,
254 started_at: None,
255 succeeded_at: None,
256 failed_at: None,
257 output_payload_json: None,
258 failure_reason: None,
259 target_execution_time,
260 };
261 partitions
262 .pending_executions
263 .write(&mut tx)
264 .insert(&execution.id, &execution_data);
265
266 partitions.idx_job_executions.write(&mut tx).insert(
267 &indexes::JobExecutionIndexKey::new(execution.job_id, execution.id),
268 &(),
269 );
270 partitions
271 .idx_job_active_execution
272 .write(&mut tx)
273 .insert(&execution.job_id, &execution.id);
274
275 partitions
276 .idx_pending_jobs
277 .write(&mut tx)
278 .remove(&execution.job_id);
279 }
280
281 tx.commit()?;
282
283 Ok(())
284 })
285 .await
286 }
287
288 async fn executions_ready(
289 &self,
290 execution_ids: &[Uuid],
291 timestamp: SystemTime,
292 ) -> eyre::Result<()> {
293 let mut execution_ids: Vec<_> = execution_ids.into();
294 execution_ids.sort();
295
296 self.write(move |mut tx, partitions| {
297 for execution_id in execution_ids {
298 if let Some(execution_data) = partitions
299 .pending_executions
300 .write(&mut tx)
301 .take(&execution_id)?
302 {
303 let mut execution_data = deserialize!(execution_data.value())?;
304 execution_data.ready_at = Some(timestamp);
305
306 partitions
307 .ready_executions
308 .write(&mut tx)
309 .insert(&execution_id, &execution_data);
310 }
311 }
312
313 tx.commit()?;
314
315 Ok(())
316 })
317 .await
318 }
319
320 async fn execution_assigned(
321 &self,
322 execution_id: Uuid,
323 executor_id: Uuid,
324 timestamp: SystemTime,
325 ) -> eyre::Result<()> {
326 self.write(move |mut tx, partitions| {
327 if let Some(execution_data) = partitions
328 .ready_executions
329 .write(&mut tx)
330 .take(&execution_id)?
331 {
332 let mut execution_data = deserialize!(execution_data.value())?;
333 execution_data.assigned_at = Some(timestamp);
334 execution_data.executor_id = Some(executor_id);
335
336 partitions
337 .assigned_executions
338 .write(&mut tx)
339 .insert(&execution_id, &execution_data);
340 }
341
342 tx.commit()?;
343
344 Ok(())
345 })
346 .await
347 }
348
349 async fn execution_started(
350 &self,
351 execution_id: Uuid,
352 timestamp: SystemTime,
353 ) -> eyre::Result<()> {
354 self.write(move |mut tx, partitions| {
355 if let Some(execution_data) = partitions
356 .assigned_executions
357 .write(&mut tx)
358 .take(&execution_id)?
359 {
360 let mut execution_data = deserialize!(execution_data.value())?;
361 execution_data.started_at = Some(timestamp);
362
363 partitions
364 .running_executions
365 .write(&mut tx)
366 .insert(&execution_id, &execution_data);
367 }
368
369 tx.commit()?;
370
371 Ok(())
372 })
373 .await
374 }
375
376 async fn execution_succeeded(
377 &self,
378 execution_id: Uuid,
379 timestamp: SystemTime,
380 output_payload_json: String,
381 ) -> eyre::Result<()> {
382 self.write(move |mut tx, partitions| {
383 if let Some(execution_data) = partitions
384 .running_executions
385 .write(&mut tx)
386 .take(&execution_id)?
387 {
388 let mut execution_data = deserialize!(execution_data.value())?;
389 execution_data.succeeded_at = Some(timestamp);
390 execution_data.output_payload_json = Some(output_payload_json);
391
392 partitions
393 .succeeded_executions
394 .write(&mut tx)
395 .insert(&execution_id, &execution_data);
396
397 partitions
398 .idx_job_active_execution
399 .write(&mut tx)
400 .remove(&execution_data.job_id);
401
402 job_unschedulable(partitions, &mut tx, execution_data.job_id, timestamp)?;
403 }
404
405 tx.commit()?;
406
407 Ok(())
408 })
409 .await
410 }
411
412 async fn executions_failed(
413 &self,
414 execution_ids: &[Uuid],
415 timestamp: SystemTime,
416 reason: String,
417 mark_job_unschedulable: bool,
418 ) -> eyre::Result<()> {
419 let mut execution_ids: Vec<_> = execution_ids.into();
420 execution_ids.sort();
421
422 self.write(move |mut tx, partitions| {
423 let search_partitions = [
426 &partitions.running_executions,
427 &partitions.assigned_executions,
428 &partitions.ready_executions,
429 &partitions.pending_executions,
430 ];
431
432 for execution_id in execution_ids {
433 'partition_search: for partition in search_partitions {
434 if let Some(execution_data) = partition.write(&mut tx).take(&execution_id)? {
435 let mut execution_data = deserialize!(execution_data.value())?;
436 execution_data.failed_at = Some(timestamp);
437 execution_data.failure_reason = Some(reason.clone());
438
439 partitions
440 .failed_executions
441 .write(&mut tx)
442 .insert(&execution_id, &execution_data);
443
444 partitions
445 .idx_job_active_execution
446 .write(&mut tx)
447 .remove(&execution_data.job_id);
448
449 if mark_job_unschedulable {
450 job_unschedulable(
451 partitions,
452 &mut tx,
453 execution_data.job_id,
454 timestamp,
455 )?;
456 } else {
457 partitions
458 .idx_pending_jobs
459 .write(&mut tx)
460 .insert(&execution_data.job_id, &());
461 }
462
463 break 'partition_search;
464 }
465 }
466 }
467
468 tx.commit()?;
469
470 Ok(())
471 })
472 .await
473 }
474
475 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
476 let executor_ids: IndexSet<_> = executor_ids.iter().copied().collect();
477
478 self.read(move |tx, partitions| {
479 let mut orphaned_execution_ids = Vec::new();
480
481 let search_partitions = [
482 &partitions.assigned_executions,
483 &partitions.running_executions,
484 ];
485
486 for partition in search_partitions {
487 for execution in partition.read(&tx).iter() {
488 let (_, execution) = execution?;
489 let execution = execution.value();
490
491 if !executor_ids.contains(&execution.executor_id.unwrap()) {
492 orphaned_execution_ids.push(execution.id);
493 }
494 }
495 }
496
497 Ok(orphaned_execution_ids)
498 })
499 .await
500 }
501
502 async fn jobs_unschedulable(
503 &self,
504 job_ids: &[Uuid],
505 timestamp: SystemTime,
506 ) -> eyre::Result<()> {
507 let mut job_ids: Vec<_> = job_ids.into();
508 job_ids.sort();
509
510 self.write(move |mut tx, partitions| {
511 for job_id in job_ids {
512 job_unschedulable(partitions, &mut tx, job_id, timestamp)?;
513 }
514
515 tx.commit()?;
516
517 Ok(())
518 })
519 .await
520 }
521
522 async fn pending_executions(
523 &self,
524 after: Option<Uuid>,
525 ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
526 self.read(move |tx, partitions| {
527 let mut pending_executions = Vec::new();
528
529 if let Some(after) = after {
530 for pending_execution in partitions.pending_executions.read(&tx).range(after..) {
531 let (_, pending_execution) = pending_execution?;
532 let pending_execution = pending_execution.value();
533
534 if pending_execution.id <= after {
535 continue;
536 }
537
538 pending_executions.push(ora_storage::PendingExecution {
539 id: pending_execution.id,
540 target_execution_time: deserialize_systemtime(
541 pending_execution.target_execution_time,
542 ),
543 });
544 }
545 } else {
546 for pending_execution in partitions.pending_executions.read(&tx).iter() {
547 let (_, pending_execution) = pending_execution?;
548 let pending_execution = pending_execution.value();
549
550 pending_executions.push(ora_storage::PendingExecution {
551 id: pending_execution.id,
552 target_execution_time: deserialize_systemtime(
553 pending_execution.target_execution_time,
554 ),
555 });
556 }
557 }
558
559 Ok(pending_executions)
560 })
561 .await
562 }
563
564 async fn ready_executions(
565 &self,
566 after: Option<Uuid>,
567 ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
568 self.read(move |tx, partitions| {
569 const BATCH_SIZE: usize = 10_000;
570
571 let mut ready_executions = Vec::with_capacity(BATCH_SIZE);
572
573 let mut add_execution = |ready_execution: Raw<ExecutionData>| -> eyre::Result<()> {
574 let ready_execution = ready_execution.value();
575
576 let job_id = ready_execution.job_id;
577 let job_data = if let Some(job_data) =
578 partitions.active_jobs.read(&tx).get(&job_id)?
579 {
580 job_data
581 } else if let Some(job_data) = partitions.inactive_jobs.read(&tx).get(&job_id)? {
582 job_data
583 } else {
584 tracing::error!(%job_id, "found ready execution for missing job");
585 return Ok(());
586 };
587 let job_data = job_data.value();
588
589 let job_execution_count = partitions
590 .idx_job_executions
591 .read(&tx)
592 .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
593 .count();
594
595 ready_executions.push(ora_storage::ReadyExecution {
596 id: ready_execution.id,
597 job_id: ready_execution.job_id,
598 input_payload_json: job_data.input_payload_json.as_str().into(),
599 attempt_number: u64::try_from(job_execution_count)?,
600 job_type_id: job_data.job_type_id.as_str().into(),
601 target_execution_time: deserialize_systemtime(
602 ready_execution.target_execution_time,
603 ),
604 timeout_policy: deserialize_archived!(&job_data.timeout_policy)
605 .unwrap()
606 .into(),
607 });
608
609 Ok(())
610 };
611
612 if let Some(after) = after {
613 for (i, ready_execution) in partitions
614 .ready_executions
615 .read(&tx)
616 .range(after..)
617 .enumerate()
618 {
619 let (execution_id, execution_data) = ready_execution?;
620 if execution_id.value() <= after {
621 continue;
622 }
623
624 add_execution(execution_data)?;
625
626 if i + 1 == BATCH_SIZE {
627 break;
628 }
629 }
630 } else {
631 for (i, ready_execution) in partitions.ready_executions.read(&tx).iter().enumerate()
632 {
633 add_execution(ready_execution?.1)?;
634
635 if i + 1 == BATCH_SIZE {
636 break;
637 }
638 }
639 }
640
641 Ok(ready_executions)
642 })
643 .await
644 }
645
646 async fn pending_jobs(
647 &self,
648 after: Option<Uuid>,
649 ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
650 self.read(move |tx, partitions| {
651 const BATCH_SIZE: usize = 10_000;
652
653 let mut pending_jobs = Vec::with_capacity(BATCH_SIZE);
654
655 for (i, pending_job) in partitions.idx_pending_jobs.read(&tx).iter().enumerate() {
656 let (job_id, _) = pending_job?;
657 let job_id = job_id.value();
658
659 if let Some(after) = after {
661 if job_id <= after {
662 continue;
663 }
664 }
665
666 let Some(job_data) = partitions.active_jobs.read(&tx).get(&job_id)? else {
667 continue;
668 };
669 let job_data = job_data.value();
670
671 let execution_count = partitions
672 .idx_job_executions
673 .read(&tx)
674 .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
675 .count();
676
677 pending_jobs.push(ora_storage::PendingJob {
678 id: job_id,
679 target_execution_time: deserialize_systemtime(job_data.target_execution_time),
680 execution_count: u64::try_from(execution_count)?,
681 retry_policy: deserialize_archived!(&job_data.retry_policy)
682 .unwrap()
683 .into(),
684 timeout_policy: deserialize_archived!(&job_data.timeout_policy)
685 .unwrap()
686 .into(),
687 });
688
689 if i + 1 == BATCH_SIZE {
690 break;
691 }
692 }
693
694 Ok(pending_jobs)
695 })
696 .await
697 }
698
699 async fn query_jobs(
700 &self,
701 cursor: Option<String>,
702 limit: usize,
703 order: ora_storage::JobQueryOrder,
704 filters: ora_storage::JobQueryFilters,
705 ) -> eyre::Result<ora_storage::JobQueryResult> {
706 self.read(move |tx, partitions| {
707 let cursor: Option<job_query::Cursor> = match cursor {
708 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
709 None => None,
710 };
711
712 job_query::query_jobs(&tx, partitions, cursor, limit, filters, order)
713 })
714 .await
715 }
716
717 async fn query_job_ids(
718 &self,
719 filters: ora_storage::JobQueryFilters,
720 ) -> eyre::Result<Vec<Uuid>> {
721 self.read(move |tx, partitions| job_query::query_job_ids(&tx, partitions, filters))
722 .await
723 }
724
725 async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
726 self.read(move |tx, partitions| job_query::count_jobs(&tx, partitions, filters))
727 .await
728 }
729
730 async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
731 self.read(|tx, partitions| {
732 let mut job_types = Vec::new();
733
734 for job_type in partitions.job_types.read(&tx).iter() {
735 let (job_type_id, job_type) = job_type?;
736 let job_type = job_type.value();
737
738 job_types.push(ora_storage::JobType {
739 id: job_type_id.value().to_string(),
740 name: job_type.name.as_str().into(),
741 description: job_type.description.as_str().into(),
742 input_schema_json: job_type
743 .input_schema_json
744 .as_ref()
745 .map(|s| s.as_str().into()),
746 output_schema_json: job_type
747 .output_schema_json
748 .as_ref()
749 .map(|s| s.as_str().into()),
750 });
751 }
752
753 Ok(job_types)
754 })
755 .await
756 }
757
758 async fn delete_jobs(
759 &self,
760 mut filters: ora_storage::JobQueryFilters,
761 ) -> eyre::Result<Vec<Uuid>> {
762 filters.active = Some(false);
768
769 let ids_to_delete = self
770 .read(move |tx, partitions| job_query::query_job_ids(&tx, partitions, filters))
771 .await?;
772
773 self.write(move |mut tx, partitions| {
774 let mut deleted_jobs = Vec::with_capacity(ids_to_delete.len());
775
776 for job_id in ids_to_delete {
777 if delete_job(partitions, &mut tx, job_id)? {
778 deleted_jobs.push(job_id);
779 }
780 }
781
782 tx.commit()?;
783
784 Ok(deleted_jobs)
785 })
786 .await
787 }
788
789 async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
790 self.write(move |mut tx, partitions| {
791 for schedule in schedules {
792 let schedule_data = ScheduleData::from(schedule);
793
794 partitions
795 .active_schedules
796 .write(&mut tx)
797 .insert(&schedule_data.id, &schedule_data);
798
799 for (key, value) in schedule_data.labels {
800 let label_key = indexes::LabelIndexKey::new(&key, &value, schedule_data.id);
801 partitions
802 .idx_schedule_labels
803 .write(&mut tx)
804 .insert(&label_key, &());
805 }
806
807 partitions
808 .idx_pending_schedules
809 .write(&mut tx)
810 .insert(&schedule_data.id, &());
811 }
812
813 tx.commit()?;
814
815 Ok(())
816 })
817 .await
818 }
819
820 async fn schedules_cancelled(
821 &self,
822 schedule_ids: &[Uuid],
823 timestamp: SystemTime,
824 ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
825 let mut schedule_ids: Vec<_> = schedule_ids.into();
826 schedule_ids.sort();
827
828 self.write(move |mut tx, partitions| {
829 let mut cancelled_schedules = Vec::with_capacity(schedule_ids.len());
830
831 for schedule_id in schedule_ids {
832 if let Some(schedule_data) = partitions
833 .active_schedules
834 .write(&mut tx)
835 .get(&schedule_id)?
836 {
837 let schedule_data = schedule_data.value();
838
839 if schedule_data.cancelled_at.is_some() {
840 continue;
841 }
842
843 let mut schedule_data = deserialize!(schedule_data)?;
844 schedule_data.cancelled_at = Some(timestamp);
845
846 partitions
847 .active_schedules
848 .write(&mut tx)
849 .insert(&schedule_id, &schedule_data);
850 schedule_unschedulable(partitions, &mut tx, schedule_id, timestamp)?;
851
852 cancelled_schedules.push(ora_storage::CancelledSchedule { id: schedule_id });
853 }
854 }
855
856 tx.commit()?;
857
858 Ok(cancelled_schedules)
859 })
860 .await
861 }
862
863 async fn schedules_unschedulable(
864 &self,
865 schedule_ids: &[Uuid],
866 timestamp: SystemTime,
867 ) -> eyre::Result<()> {
868 let mut schedule_ids: Vec<_> = schedule_ids.into();
869 schedule_ids.sort();
870
871 self.write(move |mut tx, partitions| {
872 for schedule_id in schedule_ids {
873 schedule_unschedulable(partitions, &mut tx, schedule_id, timestamp)?;
874 }
875
876 tx.commit()?;
877
878 Ok(())
879 })
880 .await
881 }
882
883 async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>> {
884 self.read(move |tx, partitions| {
885 const BATCH_SIZE: usize = 10_000;
886
887 let mut pending_schedules = Vec::with_capacity(BATCH_SIZE);
888
889 for (i, pending_schedule) in partitions
890 .idx_pending_schedules
891 .read(&tx)
892 .iter()
893 .enumerate()
894 {
895 let (schedule_id, _) = pending_schedule?;
896 let schedule_id = schedule_id.value();
897
898 if let Some(after) = after {
900 if schedule_id <= after {
901 continue;
902 }
903 }
904
905 let Some(schedule_data) =
906 partitions.active_schedules.read(&tx).get(&schedule_id)?
907 else {
908 continue;
909 };
910 let schedule_data = schedule_data.value();
911
912 let last_schedule_job_id = partitions
913 .idx_schedule_jobs
914 .read(&tx)
915 .prefix(&indexes::ScheduleJobIndexKey::new_prefix(schedule_id))
916 .last()
917 .transpose()?
918 .map(|(job_id, _)| job_id.value().job_id.unwrap());
919
920 let last_target_execution_time = match last_schedule_job_id {
921 Some(job_id) => {
922 let job_data = partitions.inactive_jobs.read(&tx).get(&job_id)?;
923 let job_data = match job_data {
924 Some(job_data) => Some(job_data),
925 None => partitions.active_jobs.read(&tx).get(&job_id)?,
926 };
927
928 match job_data {
929 Some(job_data) => {
930 let job_data = job_data.value();
931
932 Some(job_data.target_execution_time)
933 }
934 None => None,
935 }
936 }
937 None => None,
938 };
939
940 pending_schedules.push(ora_storage::PendingSchedule {
941 id: schedule_id,
942 job_timing_policy: deserialize!(&schedule_data.job_timing_policy)?.into(),
943 job_creation_policy: deserialize!(&schedule_data.job_creation_policy)?.into(),
944 last_target_execution_time: last_target_execution_time
945 .map(deserialize_systemtime),
946 time_range: match schedule_data.time_range.as_ref() {
947 Some(time_range) => Some(deserialize!(time_range)?.into()),
948 None => None,
949 },
950 });
951
952 if i + 1 == BATCH_SIZE {
953 break;
954 }
955 }
956
957 Ok(pending_schedules)
958 })
959 .await
960 }
961
962 async fn query_schedules(
963 &self,
964 cursor: Option<String>,
965 limit: usize,
966 filters: ora_storage::ScheduleQueryFilters,
967 order: ora_storage::ScheduleQueryOrder,
968 ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
969 self.read(move |tx, partitions| {
970 let cursor: Option<schedule_query::Cursor> = match cursor {
971 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
972 None => None,
973 };
974
975 schedule_query::query_schedules(&tx, partitions, cursor, limit, filters, order)
976 })
977 .await
978 }
979
980 async fn query_schedule_ids(
981 &self,
982 filters: ora_storage::ScheduleQueryFilters,
983 ) -> eyre::Result<Vec<Uuid>> {
984 self.read(move |tx, partitions| {
985 schedule_query::query_schedule_ids(&tx, partitions, filters)
986 })
987 .await
988 }
989
990 async fn count_schedules(
991 &self,
992 filters: ora_storage::ScheduleQueryFilters,
993 ) -> eyre::Result<u64> {
994 self.read(move |tx, partitions| schedule_query::count_schedules(&tx, partitions, filters))
995 .await
996 }
997
998 async fn delete_schedules(
999 &self,
1000 mut filters: ora_storage::ScheduleQueryFilters,
1001 ) -> eyre::Result<Vec<Uuid>> {
1002 filters.active = Some(false);
1008
1009 let ids_to_delete = self
1010 .read(move |tx, partitions| {
1011 schedule_query::query_schedule_ids(&tx, partitions, filters)
1012 })
1013 .await?;
1014
1015 self.write(move |mut tx, partitions| {
1016 let mut deleted_schedules = Vec::with_capacity(ids_to_delete.len());
1017
1018 for schedule_id in ids_to_delete {
1019 let Some(schedule_data) = partitions
1021 .inactive_schedules
1022 .write(&mut tx)
1023 .take(&schedule_id)?
1024 else {
1025 continue;
1026 };
1027
1028 deleted_schedules.push(schedule_id);
1029
1030 let schedule_data = schedule_data.value();
1031
1032 for (key, value) in schedule_data.labels.iter() {
1034 let label_key = indexes::LabelIndexKey::new(key, value, schedule_id);
1035 partitions
1036 .idx_schedule_labels
1037 .write(&mut tx)
1038 .remove(&label_key);
1039 }
1040
1041 let mut job_ids = Vec::new();
1043
1044 for schedule_job in partitions
1045 .idx_schedule_jobs
1046 .write(&mut tx)
1047 .prefix(&indexes::ScheduleJobIndexKey::new_prefix(schedule_id))
1048 {
1049 let (job_id, _) = schedule_job?;
1050 let job_id = job_id.value().job_id.unwrap();
1051
1052 job_ids.push(job_id);
1053 }
1054
1055 for job_id in job_ids {
1056 delete_job(partitions, &mut tx, job_id)?;
1057 }
1058 }
1059
1060 tx.commit()?;
1061
1062 Ok(deleted_schedules)
1063 })
1064 .await
1065 }
1066
1067 async fn job_added_conditionally(
1068 &self,
1069 _job: ora_storage::NewJob,
1070 _filters: ora_storage::JobQueryFilters,
1071 ) -> eyre::Result<ora_storage::ConditionalJobResult> {
1072 bail!("not supported")
1073 }
1074
1075 async fn schedule_added_conditionally(
1076 &self,
1077 _schedule: ora_storage::NewSchedule,
1078 _filters: ora_storage::ScheduleQueryFilters,
1079 ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
1080 bail!("not supported")
1081 }
1082}
1083
1084fn delete_job<'a>(
1085 partitions: &'a Partitions,
1086 tx: &mut WriteTransaction<'a>,
1087 job_id: Uuid,
1088) -> Result<bool, eyre::Error> {
1089 let Some(job_data) = partitions.inactive_jobs.write(tx).take(&job_id)? else {
1090 return Ok(false);
1091 };
1092 let job_data = job_data.value();
1093 let mut execution_ids = Vec::new();
1094 for job_execution in partitions
1095 .idx_job_executions
1096 .write(tx)
1097 .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
1098 {
1099 let (job_execution_id, _) = job_execution?;
1100 let job_execution_id = job_execution_id.value().execution_id.unwrap();
1101
1102 execution_ids.push(job_execution_id);
1103 }
1104 for job_execution_id in execution_ids {
1105 partitions
1106 .succeeded_executions
1107 .write(tx)
1108 .remove(&job_execution_id);
1109 partitions
1110 .failed_executions
1111 .write(tx)
1112 .remove(&job_execution_id);
1113
1114 partitions
1115 .idx_job_executions
1116 .write(tx)
1117 .remove(&indexes::JobExecutionIndexKey::new(
1118 job_id,
1119 job_execution_id,
1120 ));
1121 }
1122 for (key, value) in job_data.labels.iter() {
1123 let label_key = indexes::LabelIndexKey::new(key, value, job_id);
1124 partitions.idx_job_labels.write(tx).remove(&label_key);
1125 }
1126
1127 if let Some(schedule_id) = partitions.idx_job_schedule.write(tx).get(&job_id)? {
1128 partitions
1129 .idx_schedule_jobs
1130 .write(tx)
1131 .remove(&indexes::ScheduleJobIndexKey::new(
1132 schedule_id.value(),
1133 job_id,
1134 ));
1135 partitions.idx_job_schedule.write(tx).remove(&job_id);
1136 }
1137
1138 Ok(true)
1139}
1140
1141fn job_unschedulable<'a>(
1142 partitions: &'a Partitions,
1143 tx: &mut WriteTransaction<'a>,
1144 job_id: Uuid,
1145 timestamp: SystemTime,
1146) -> Result<(), eyre::Error> {
1147 if let Some(job_data) = partitions.active_jobs.write(tx).take(&job_id)? {
1148 let mut job_data = deserialize!(job_data.value())?;
1149
1150 if job_data.marked_unschedulable_at.is_none() {
1151 job_data.marked_unschedulable_at = Some(timestamp);
1152 }
1153
1154 if partitions
1155 .idx_job_active_execution
1156 .write(tx)
1157 .contains_key(&job_id)?
1158 {
1159 partitions.active_jobs.write(tx).insert(&job_id, &job_data);
1162 } else {
1163 partitions
1164 .inactive_jobs
1165 .write(tx)
1166 .insert(&job_id, &job_data);
1167
1168 if let Some(schedule_id) = job_data.schedule_id {
1169 partitions
1170 .idx_schedule_active_job
1171 .write(tx)
1172 .remove(&schedule_id);
1173
1174 if let Some(schedule_data) =
1175 partitions.active_schedules.write(tx).get(&schedule_id)?
1176 {
1177 let schedule_data = schedule_data.value();
1178
1179 if schedule_data.cancelled_at.is_none() {
1182 partitions
1183 .idx_pending_schedules
1184 .write(tx)
1185 .insert(&schedule_id, &());
1186 }
1187 }
1188 }
1189 }
1190
1191 partitions.idx_pending_jobs.write(tx).remove(&job_id);
1192 }
1193 Ok(())
1194}
1195
1196fn schedule_unschedulable<'a>(
1197 partitions: &'a Partitions,
1198 tx: &mut WriteTransaction<'a>,
1199 schedule_id: Uuid,
1200 timestamp: SystemTime,
1201) -> Result<(), eyre::Error> {
1202 if let Some(schedule_data) = partitions.active_schedules.write(tx).take(&schedule_id)? {
1203 let mut schedule_data = deserialize!(schedule_data.value())?;
1204
1205 if schedule_data.marked_unschedulable_at.is_none() {
1206 schedule_data.marked_unschedulable_at = Some(timestamp);
1207 }
1208
1209 if partitions
1210 .idx_schedule_active_job
1211 .write(tx)
1212 .contains_key(&schedule_id)?
1213 {
1214 partitions
1217 .active_schedules
1218 .write(tx)
1219 .insert(&schedule_id, &schedule_data);
1220 } else {
1221 partitions
1222 .inactive_schedules
1223 .write(tx)
1224 .insert(&schedule_id, &schedule_data);
1225 }
1226
1227 partitions
1228 .idx_pending_schedules
1229 .write(tx)
1230 .remove(&schedule_id);
1231 }
1232 Ok(())
1233}