ora_storage_fjall/
lib.rs

1//! A persistent storage implementation based on the fjall storage engine.
2
3use std::{mem, time::SystemTime};
4
5use async_trait::async_trait;
6use eyre::{bail, Context};
7use fjall::{PersistMode, ReadTransaction, TxKeyspace, WriteTransaction};
8use models::{ExecutionData, JobData, JobTypeData, ScheduleData};
9use ora_storage::IndexSet;
10use ora_storage::{PendingSchedule, Storage};
11use partitions::Partitions;
12use tokio::task::spawn_blocking;
13use typed::Raw;
14use util::deserialize_systemtime;
15use uuid::Uuid;
16
17#[macro_use]
18pub(crate) mod util;
19
20mod indexes;
21mod job_query;
22mod models;
23mod partitions;
24mod schedule_query;
25mod snapshot;
26mod typed;
27
28/// Configuration for the fjall storage engine.
29#[must_use]
30pub struct FjallStorageConfig {
31    fjall_config: fjall::Config,
32    durability: Option<PersistMode>,
33}
34
35impl FjallStorageConfig {
36    /// Creates a new configuration for the fjall storage engine.
37    pub fn new(fjall_config: fjall::Config) -> Self {
38        Self {
39            fjall_config,
40            durability: Some(PersistMode::SyncAll),
41        }
42    }
43
44    /// Sets the durability mode to use for transactions.
45    pub fn transcation_durability(mut self, durability: Option<PersistMode>) -> Self {
46        self.durability = durability;
47        self
48    }
49}
50
51/// A persistent storage implementation based on the fjall storage engine.
52#[derive(Clone)]
53pub struct FjallStorage {
54    keyspace: TxKeyspace,
55    /// The durability mode to use for transactions.
56    durability: Option<PersistMode>,
57    partitions: Partitions,
58}
59
60impl std::fmt::Debug for FjallStorage {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("FjallStorage").finish_non_exhaustive()
63    }
64}
65
66impl FjallStorage {
67    /// Creates a new fjall storage instance.
68    pub fn new(config: FjallStorageConfig) -> eyre::Result<Self> {
69        let keyspace = config.fjall_config.open_transactional()?;
70
71        let partitions = Partitions::new(&keyspace)?;
72
73        Ok(Self {
74            partitions,
75            keyspace,
76            durability: config.durability,
77        })
78    }
79
80    async fn write<F, T>(&self, f: F) -> eyre::Result<T>
81    where
82        F: for<'a> FnOnce(WriteTransaction<'a>, &'a Partitions) -> eyre::Result<T> + Send + 'static,
83        T: Send + 'static,
84    {
85        let keyspace = self.keyspace.clone();
86        let partitions = self.partitions.clone();
87        let durability = self.durability;
88
89        spawn_blocking(move || f(keyspace.write_tx().durability(durability), &partitions))
90            .await
91            .map_err(eyre::Report::from)?
92    }
93
94    async fn read<F, T>(&self, f: F) -> eyre::Result<T>
95    where
96        F: for<'a> FnOnce(ReadTransaction, &'a Partitions) -> eyre::Result<T> + Send + 'static,
97        T: Send + 'static,
98    {
99        let keyspace = self.keyspace.clone();
100        let partitions = self.partitions.clone();
101
102        spawn_blocking(move || f(keyspace.read_tx(), &partitions))
103            .await
104            .map_err(eyre::Report::from)?
105    }
106}
107
108#[async_trait]
109impl Storage for FjallStorage {
110    async fn job_types_added(&self, job_types: Vec<ora_storage::JobType>) -> eyre::Result<()> {
111        self.write(|mut tx, partitions| {
112            for mut job_type in job_types {
113                let job_type_id = mem::take(&mut job_type.id);
114                let job_type_data = JobTypeData::from(job_type);
115
116                partitions
117                    .job_types
118                    .write(&mut tx)
119                    .insert(&job_type_id, &job_type_data);
120            }
121
122            tx.commit()?;
123
124            Ok(())
125        })
126        .await
127    }
128
129    async fn jobs_added(&self, jobs: Vec<ora_storage::NewJob>) -> eyre::Result<()> {
130        self.write(|mut tx, partitions| {
131            for job in jobs {
132                let job_data = JobData::from(job);
133
134                partitions
135                    .active_jobs
136                    .write(&mut tx)
137                    .insert(&job_data.id, &job_data);
138
139                for (key, value) in job_data.labels {
140                    let label_key = indexes::LabelIndexKey::new(&key, &value, job_data.id);
141                    partitions
142                        .idx_job_labels
143                        .write(&mut tx)
144                        .insert(&label_key, &());
145                }
146
147                partitions
148                    .idx_pending_jobs
149                    .write(&mut tx)
150                    .insert(&job_data.id, &());
151
152                if let Some(schedule_id) = job_data.schedule_id {
153                    let schedule_job_key =
154                        indexes::ScheduleJobIndexKey::new(schedule_id, job_data.id);
155                    partitions
156                        .idx_schedule_jobs
157                        .write(&mut tx)
158                        .insert(&schedule_job_key, &());
159                    partitions
160                        .idx_schedule_active_job
161                        .write(&mut tx)
162                        .insert(&schedule_id, &job_data.id);
163                    partitions
164                        .idx_job_schedule
165                        .write(&mut tx)
166                        .insert(&job_data.id, &schedule_id);
167                    partitions
168                        .idx_pending_schedules
169                        .write(&mut tx)
170                        .remove(&schedule_id);
171                }
172            }
173
174            tx.commit()?;
175
176            Ok(())
177        })
178        .await
179    }
180
181    async fn jobs_cancelled(
182        &self,
183        job_ids: &[Uuid],
184        timestamp: SystemTime,
185    ) -> eyre::Result<Vec<ora_storage::CancelledJob>> {
186        let mut job_ids: Vec<_> = job_ids.into();
187        job_ids.sort();
188
189        self.write(move |mut tx, partitions| {
190            let mut cancelled_jobs = Vec::with_capacity(job_ids.len());
191
192            for job_id in job_ids {
193                if let Some(job_data) = partitions.active_jobs.write(&mut tx).get(&job_id)? {
194                    let job_data = job_data.value();
195
196                    if job_data.cancelled_at.is_some() {
197                        continue;
198                    }
199
200                    let mut job_data = deserialize!(job_data)?;
201                    job_data.cancelled_at = Some(timestamp);
202
203                    partitions
204                        .active_jobs
205                        .write(&mut tx)
206                        .insert(&job_id, &job_data);
207                    job_unschedulable(partitions, &mut tx, job_id, timestamp)?;
208
209                    let active_execution = partitions
210                        .idx_job_active_execution
211                        .write(&mut tx)
212                        .get(&job_id)?
213                        .map(|id| id.value());
214
215                    cancelled_jobs.push(ora_storage::CancelledJob {
216                        id: job_id,
217                        active_execution,
218                    });
219                }
220            }
221
222            tx.commit()?;
223
224            Ok(cancelled_jobs)
225        })
226        .await
227    }
228
229    async fn executions_added(
230        &self,
231        executions: Vec<ora_storage::NewExecution>,
232        timestamp: SystemTime,
233    ) -> eyre::Result<()> {
234        self.write(move |mut tx, partitions| {
235            for execution in executions {
236                let Some(job) = partitions
237                    .active_jobs
238                    .write(&mut tx)
239                    .get(&execution.job_id)?
240                else {
241                    continue;
242                };
243
244                let target_execution_time =
245                    deserialize_systemtime(job.value().target_execution_time);
246
247                let execution_data = ExecutionData {
248                    id: execution.id,
249                    job_id: execution.job_id,
250                    executor_id: None,
251                    created_at: timestamp,
252                    ready_at: None,
253                    assigned_at: None,
254                    started_at: None,
255                    succeeded_at: None,
256                    failed_at: None,
257                    output_payload_json: None,
258                    failure_reason: None,
259                    target_execution_time,
260                };
261                partitions
262                    .pending_executions
263                    .write(&mut tx)
264                    .insert(&execution.id, &execution_data);
265
266                partitions.idx_job_executions.write(&mut tx).insert(
267                    &indexes::JobExecutionIndexKey::new(execution.job_id, execution.id),
268                    &(),
269                );
270                partitions
271                    .idx_job_active_execution
272                    .write(&mut tx)
273                    .insert(&execution.job_id, &execution.id);
274
275                partitions
276                    .idx_pending_jobs
277                    .write(&mut tx)
278                    .remove(&execution.job_id);
279            }
280
281            tx.commit()?;
282
283            Ok(())
284        })
285        .await
286    }
287
288    async fn executions_ready(
289        &self,
290        execution_ids: &[Uuid],
291        timestamp: SystemTime,
292    ) -> eyre::Result<()> {
293        let mut execution_ids: Vec<_> = execution_ids.into();
294        execution_ids.sort();
295
296        self.write(move |mut tx, partitions| {
297            for execution_id in execution_ids {
298                if let Some(execution_data) = partitions
299                    .pending_executions
300                    .write(&mut tx)
301                    .take(&execution_id)?
302                {
303                    let mut execution_data = deserialize!(execution_data.value())?;
304                    execution_data.ready_at = Some(timestamp);
305
306                    partitions
307                        .ready_executions
308                        .write(&mut tx)
309                        .insert(&execution_id, &execution_data);
310                }
311            }
312
313            tx.commit()?;
314
315            Ok(())
316        })
317        .await
318    }
319
320    async fn execution_assigned(
321        &self,
322        execution_id: Uuid,
323        executor_id: Uuid,
324        timestamp: SystemTime,
325    ) -> eyre::Result<()> {
326        self.write(move |mut tx, partitions| {
327            if let Some(execution_data) = partitions
328                .ready_executions
329                .write(&mut tx)
330                .take(&execution_id)?
331            {
332                let mut execution_data = deserialize!(execution_data.value())?;
333                execution_data.assigned_at = Some(timestamp);
334                execution_data.executor_id = Some(executor_id);
335
336                partitions
337                    .assigned_executions
338                    .write(&mut tx)
339                    .insert(&execution_id, &execution_data);
340            }
341
342            tx.commit()?;
343
344            Ok(())
345        })
346        .await
347    }
348
349    async fn execution_started(
350        &self,
351        execution_id: Uuid,
352        timestamp: SystemTime,
353    ) -> eyre::Result<()> {
354        self.write(move |mut tx, partitions| {
355            if let Some(execution_data) = partitions
356                .assigned_executions
357                .write(&mut tx)
358                .take(&execution_id)?
359            {
360                let mut execution_data = deserialize!(execution_data.value())?;
361                execution_data.started_at = Some(timestamp);
362
363                partitions
364                    .running_executions
365                    .write(&mut tx)
366                    .insert(&execution_id, &execution_data);
367            }
368
369            tx.commit()?;
370
371            Ok(())
372        })
373        .await
374    }
375
376    async fn execution_succeeded(
377        &self,
378        execution_id: Uuid,
379        timestamp: SystemTime,
380        output_payload_json: String,
381    ) -> eyre::Result<()> {
382        self.write(move |mut tx, partitions| {
383            if let Some(execution_data) = partitions
384                .running_executions
385                .write(&mut tx)
386                .take(&execution_id)?
387            {
388                let mut execution_data = deserialize!(execution_data.value())?;
389                execution_data.succeeded_at = Some(timestamp);
390                execution_data.output_payload_json = Some(output_payload_json);
391
392                partitions
393                    .succeeded_executions
394                    .write(&mut tx)
395                    .insert(&execution_id, &execution_data);
396
397                partitions
398                    .idx_job_active_execution
399                    .write(&mut tx)
400                    .remove(&execution_data.job_id);
401
402                job_unschedulable(partitions, &mut tx, execution_data.job_id, timestamp)?;
403            }
404
405            tx.commit()?;
406
407            Ok(())
408        })
409        .await
410    }
411
412    async fn executions_failed(
413        &self,
414        execution_ids: &[Uuid],
415        timestamp: SystemTime,
416        reason: String,
417        mark_job_unschedulable: bool,
418    ) -> eyre::Result<()> {
419        let mut execution_ids: Vec<_> = execution_ids.into();
420        execution_ids.sort();
421
422        self.write(move |mut tx, partitions| {
423            // Executions may fail from any prior state,
424            // so we loop through all partitions.
425            let search_partitions = [
426                &partitions.running_executions,
427                &partitions.assigned_executions,
428                &partitions.ready_executions,
429                &partitions.pending_executions,
430            ];
431
432            for execution_id in execution_ids {
433                'partition_search: for partition in search_partitions {
434                    if let Some(execution_data) = partition.write(&mut tx).take(&execution_id)? {
435                        let mut execution_data = deserialize!(execution_data.value())?;
436                        execution_data.failed_at = Some(timestamp);
437                        execution_data.failure_reason = Some(reason.clone());
438
439                        partitions
440                            .failed_executions
441                            .write(&mut tx)
442                            .insert(&execution_id, &execution_data);
443
444                        partitions
445                            .idx_job_active_execution
446                            .write(&mut tx)
447                            .remove(&execution_data.job_id);
448
449                        if mark_job_unschedulable {
450                            job_unschedulable(
451                                partitions,
452                                &mut tx,
453                                execution_data.job_id,
454                                timestamp,
455                            )?;
456                        } else {
457                            partitions
458                                .idx_pending_jobs
459                                .write(&mut tx)
460                                .insert(&execution_data.job_id, &());
461                        }
462
463                        break 'partition_search;
464                    }
465                }
466            }
467
468            tx.commit()?;
469
470            Ok(())
471        })
472        .await
473    }
474
475    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
476        let executor_ids: IndexSet<_> = executor_ids.iter().copied().collect();
477
478        self.read(move |tx, partitions| {
479            let mut orphaned_execution_ids = Vec::new();
480
481            let search_partitions = [
482                &partitions.assigned_executions,
483                &partitions.running_executions,
484            ];
485
486            for partition in search_partitions {
487                for execution in partition.read(&tx).iter() {
488                    let (_, execution) = execution?;
489                    let execution = execution.value();
490
491                    if !executor_ids.contains(&execution.executor_id.unwrap()) {
492                        orphaned_execution_ids.push(execution.id);
493                    }
494                }
495            }
496
497            Ok(orphaned_execution_ids)
498        })
499        .await
500    }
501
502    async fn jobs_unschedulable(
503        &self,
504        job_ids: &[Uuid],
505        timestamp: SystemTime,
506    ) -> eyre::Result<()> {
507        let mut job_ids: Vec<_> = job_ids.into();
508        job_ids.sort();
509
510        self.write(move |mut tx, partitions| {
511            for job_id in job_ids {
512                job_unschedulable(partitions, &mut tx, job_id, timestamp)?;
513            }
514
515            tx.commit()?;
516
517            Ok(())
518        })
519        .await
520    }
521
522    async fn pending_executions(
523        &self,
524        after: Option<Uuid>,
525    ) -> eyre::Result<Vec<ora_storage::PendingExecution>> {
526        self.read(move |tx, partitions| {
527            let mut pending_executions = Vec::new();
528
529            if let Some(after) = after {
530                for pending_execution in partitions.pending_executions.read(&tx).range(after..) {
531                    let (_, pending_execution) = pending_execution?;
532                    let pending_execution = pending_execution.value();
533
534                    if pending_execution.id <= after {
535                        continue;
536                    }
537
538                    pending_executions.push(ora_storage::PendingExecution {
539                        id: pending_execution.id,
540                        target_execution_time: deserialize_systemtime(
541                            pending_execution.target_execution_time,
542                        ),
543                    });
544                }
545            } else {
546                for pending_execution in partitions.pending_executions.read(&tx).iter() {
547                    let (_, pending_execution) = pending_execution?;
548                    let pending_execution = pending_execution.value();
549
550                    pending_executions.push(ora_storage::PendingExecution {
551                        id: pending_execution.id,
552                        target_execution_time: deserialize_systemtime(
553                            pending_execution.target_execution_time,
554                        ),
555                    });
556                }
557            }
558
559            Ok(pending_executions)
560        })
561        .await
562    }
563
564    async fn ready_executions(
565        &self,
566        after: Option<Uuid>,
567    ) -> eyre::Result<Vec<ora_storage::ReadyExecution>> {
568        self.read(move |tx, partitions| {
569            const BATCH_SIZE: usize = 10_000;
570
571            let mut ready_executions = Vec::with_capacity(BATCH_SIZE);
572
573            let mut add_execution = |ready_execution: Raw<ExecutionData>| -> eyre::Result<()> {
574                let ready_execution = ready_execution.value();
575
576                let job_id = ready_execution.job_id;
577                let job_data = if let Some(job_data) =
578                    partitions.active_jobs.read(&tx).get(&job_id)?
579                {
580                    job_data
581                } else if let Some(job_data) = partitions.inactive_jobs.read(&tx).get(&job_id)? {
582                    job_data
583                } else {
584                    tracing::error!(%job_id, "found ready execution for missing job");
585                    return Ok(());
586                };
587                let job_data = job_data.value();
588
589                let job_execution_count = partitions
590                    .idx_job_executions
591                    .read(&tx)
592                    .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
593                    .count();
594
595                ready_executions.push(ora_storage::ReadyExecution {
596                    id: ready_execution.id,
597                    job_id: ready_execution.job_id,
598                    input_payload_json: job_data.input_payload_json.as_str().into(),
599                    attempt_number: u64::try_from(job_execution_count)?,
600                    job_type_id: job_data.job_type_id.as_str().into(),
601                    target_execution_time: deserialize_systemtime(
602                        ready_execution.target_execution_time,
603                    ),
604                    timeout_policy: deserialize_archived!(&job_data.timeout_policy)
605                        .unwrap()
606                        .into(),
607                });
608
609                Ok(())
610            };
611
612            if let Some(after) = after {
613                for (i, ready_execution) in partitions
614                    .ready_executions
615                    .read(&tx)
616                    .range(after..)
617                    .enumerate()
618                {
619                    let (execution_id, execution_data) = ready_execution?;
620                    if execution_id.value() <= after {
621                        continue;
622                    }
623
624                    add_execution(execution_data)?;
625
626                    if i + 1 == BATCH_SIZE {
627                        break;
628                    }
629                }
630            } else {
631                for (i, ready_execution) in partitions.ready_executions.read(&tx).iter().enumerate()
632                {
633                    add_execution(ready_execution?.1)?;
634
635                    if i + 1 == BATCH_SIZE {
636                        break;
637                    }
638                }
639            }
640
641            Ok(ready_executions)
642        })
643        .await
644    }
645
646    async fn pending_jobs(
647        &self,
648        after: Option<Uuid>,
649    ) -> eyre::Result<Vec<ora_storage::PendingJob>> {
650        self.read(move |tx, partitions| {
651            const BATCH_SIZE: usize = 10_000;
652
653            let mut pending_jobs = Vec::with_capacity(BATCH_SIZE);
654
655            for (i, pending_job) in partitions.idx_pending_jobs.read(&tx).iter().enumerate() {
656                let (job_id, _) = pending_job?;
657                let job_id = job_id.value();
658
659                // FIXME(perf): use a range query instead of filtering
660                if let Some(after) = after {
661                    if job_id <= after {
662                        continue;
663                    }
664                }
665
666                let Some(job_data) = partitions.active_jobs.read(&tx).get(&job_id)? else {
667                    continue;
668                };
669                let job_data = job_data.value();
670
671                let execution_count = partitions
672                    .idx_job_executions
673                    .read(&tx)
674                    .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
675                    .count();
676
677                pending_jobs.push(ora_storage::PendingJob {
678                    id: job_id,
679                    target_execution_time: deserialize_systemtime(job_data.target_execution_time),
680                    execution_count: u64::try_from(execution_count)?,
681                    retry_policy: deserialize_archived!(&job_data.retry_policy)
682                        .unwrap()
683                        .into(),
684                    timeout_policy: deserialize_archived!(&job_data.timeout_policy)
685                        .unwrap()
686                        .into(),
687                });
688
689                if i + 1 == BATCH_SIZE {
690                    break;
691                }
692            }
693
694            Ok(pending_jobs)
695        })
696        .await
697    }
698
699    async fn query_jobs(
700        &self,
701        cursor: Option<String>,
702        limit: usize,
703        order: ora_storage::JobQueryOrder,
704        filters: ora_storage::JobQueryFilters,
705    ) -> eyre::Result<ora_storage::JobQueryResult> {
706        self.read(move |tx, partitions| {
707            let cursor: Option<job_query::Cursor> = match cursor {
708                Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
709                None => None,
710            };
711
712            job_query::query_jobs(&tx, partitions, cursor, limit, filters, order)
713        })
714        .await
715    }
716
717    async fn query_job_ids(
718        &self,
719        filters: ora_storage::JobQueryFilters,
720    ) -> eyre::Result<Vec<Uuid>> {
721        self.read(move |tx, partitions| job_query::query_job_ids(&tx, partitions, filters))
722            .await
723    }
724
725    async fn count_jobs(&self, filters: ora_storage::JobQueryFilters) -> eyre::Result<u64> {
726        self.read(move |tx, partitions| job_query::count_jobs(&tx, partitions, filters))
727            .await
728    }
729
730    async fn query_job_types(&self) -> eyre::Result<Vec<ora_storage::JobType>> {
731        self.read(|tx, partitions| {
732            let mut job_types = Vec::new();
733
734            for job_type in partitions.job_types.read(&tx).iter() {
735                let (job_type_id, job_type) = job_type?;
736                let job_type = job_type.value();
737
738                job_types.push(ora_storage::JobType {
739                    id: job_type_id.value().to_string(),
740                    name: job_type.name.as_str().into(),
741                    description: job_type.description.as_str().into(),
742                    input_schema_json: job_type
743                        .input_schema_json
744                        .as_ref()
745                        .map(|s| s.as_str().into()),
746                    output_schema_json: job_type
747                        .output_schema_json
748                        .as_ref()
749                        .map(|s| s.as_str().into()),
750                });
751            }
752
753            Ok(job_types)
754        })
755        .await
756    }
757
758    async fn delete_jobs(
759        &self,
760        mut filters: ora_storage::JobQueryFilters,
761    ) -> eyre::Result<Vec<Uuid>> {
762        // Deleting active jobs may cause all sorts of issues,
763        // so we don't allow it.
764        //
765        // The server itself should also set this filter, we're
766        // just being extra cautious here.
767        filters.active = Some(false);
768
769        let ids_to_delete = self
770            .read(move |tx, partitions| job_query::query_job_ids(&tx, partitions, filters))
771            .await?;
772
773        self.write(move |mut tx, partitions| {
774            let mut deleted_jobs = Vec::with_capacity(ids_to_delete.len());
775
776            for job_id in ids_to_delete {
777                if delete_job(partitions, &mut tx, job_id)? {
778                    deleted_jobs.push(job_id);
779                }
780            }
781
782            tx.commit()?;
783
784            Ok(deleted_jobs)
785        })
786        .await
787    }
788
789    async fn schedules_added(&self, schedules: Vec<ora_storage::NewSchedule>) -> eyre::Result<()> {
790        self.write(move |mut tx, partitions| {
791            for schedule in schedules {
792                let schedule_data = ScheduleData::from(schedule);
793
794                partitions
795                    .active_schedules
796                    .write(&mut tx)
797                    .insert(&schedule_data.id, &schedule_data);
798
799                for (key, value) in schedule_data.labels {
800                    let label_key = indexes::LabelIndexKey::new(&key, &value, schedule_data.id);
801                    partitions
802                        .idx_schedule_labels
803                        .write(&mut tx)
804                        .insert(&label_key, &());
805                }
806
807                partitions
808                    .idx_pending_schedules
809                    .write(&mut tx)
810                    .insert(&schedule_data.id, &());
811            }
812
813            tx.commit()?;
814
815            Ok(())
816        })
817        .await
818    }
819
820    async fn schedules_cancelled(
821        &self,
822        schedule_ids: &[Uuid],
823        timestamp: SystemTime,
824    ) -> eyre::Result<Vec<ora_storage::CancelledSchedule>> {
825        let mut schedule_ids: Vec<_> = schedule_ids.into();
826        schedule_ids.sort();
827
828        self.write(move |mut tx, partitions| {
829            let mut cancelled_schedules = Vec::with_capacity(schedule_ids.len());
830
831            for schedule_id in schedule_ids {
832                if let Some(schedule_data) = partitions
833                    .active_schedules
834                    .write(&mut tx)
835                    .get(&schedule_id)?
836                {
837                    let schedule_data = schedule_data.value();
838
839                    if schedule_data.cancelled_at.is_some() {
840                        continue;
841                    }
842
843                    let mut schedule_data = deserialize!(schedule_data)?;
844                    schedule_data.cancelled_at = Some(timestamp);
845
846                    partitions
847                        .active_schedules
848                        .write(&mut tx)
849                        .insert(&schedule_id, &schedule_data);
850                    schedule_unschedulable(partitions, &mut tx, schedule_id, timestamp)?;
851
852                    cancelled_schedules.push(ora_storage::CancelledSchedule { id: schedule_id });
853                }
854            }
855
856            tx.commit()?;
857
858            Ok(cancelled_schedules)
859        })
860        .await
861    }
862
863    async fn schedules_unschedulable(
864        &self,
865        schedule_ids: &[Uuid],
866        timestamp: SystemTime,
867    ) -> eyre::Result<()> {
868        let mut schedule_ids: Vec<_> = schedule_ids.into();
869        schedule_ids.sort();
870
871        self.write(move |mut tx, partitions| {
872            for schedule_id in schedule_ids {
873                schedule_unschedulable(partitions, &mut tx, schedule_id, timestamp)?;
874            }
875
876            tx.commit()?;
877
878            Ok(())
879        })
880        .await
881    }
882
883    async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>> {
884        self.read(move |tx, partitions| {
885            const BATCH_SIZE: usize = 10_000;
886
887            let mut pending_schedules = Vec::with_capacity(BATCH_SIZE);
888
889            for (i, pending_schedule) in partitions
890                .idx_pending_schedules
891                .read(&tx)
892                .iter()
893                .enumerate()
894            {
895                let (schedule_id, _) = pending_schedule?;
896                let schedule_id = schedule_id.value();
897
898                // FIXME(perf): use a range query instead of filtering
899                if let Some(after) = after {
900                    if schedule_id <= after {
901                        continue;
902                    }
903                }
904
905                let Some(schedule_data) =
906                    partitions.active_schedules.read(&tx).get(&schedule_id)?
907                else {
908                    continue;
909                };
910                let schedule_data = schedule_data.value();
911
912                let last_schedule_job_id = partitions
913                    .idx_schedule_jobs
914                    .read(&tx)
915                    .prefix(&indexes::ScheduleJobIndexKey::new_prefix(schedule_id))
916                    .last()
917                    .transpose()?
918                    .map(|(job_id, _)| job_id.value().job_id.unwrap());
919
920                let last_target_execution_time = match last_schedule_job_id {
921                    Some(job_id) => {
922                        let job_data = partitions.inactive_jobs.read(&tx).get(&job_id)?;
923                        let job_data = match job_data {
924                            Some(job_data) => Some(job_data),
925                            None => partitions.active_jobs.read(&tx).get(&job_id)?,
926                        };
927
928                        match job_data {
929                            Some(job_data) => {
930                                let job_data = job_data.value();
931
932                                Some(job_data.target_execution_time)
933                            }
934                            None => None,
935                        }
936                    }
937                    None => None,
938                };
939
940                pending_schedules.push(ora_storage::PendingSchedule {
941                    id: schedule_id,
942                    job_timing_policy: deserialize!(&schedule_data.job_timing_policy)?.into(),
943                    job_creation_policy: deserialize!(&schedule_data.job_creation_policy)?.into(),
944                    last_target_execution_time: last_target_execution_time
945                        .map(deserialize_systemtime),
946                    time_range: match schedule_data.time_range.as_ref() {
947                        Some(time_range) => Some(deserialize!(time_range)?.into()),
948                        None => None,
949                    },
950                });
951
952                if i + 1 == BATCH_SIZE {
953                    break;
954                }
955            }
956
957            Ok(pending_schedules)
958        })
959        .await
960    }
961
962    async fn query_schedules(
963        &self,
964        cursor: Option<String>,
965        limit: usize,
966        filters: ora_storage::ScheduleQueryFilters,
967        order: ora_storage::ScheduleQueryOrder,
968    ) -> eyre::Result<ora_storage::ScheduleQueryResult> {
969        self.read(move |tx, partitions| {
970            let cursor: Option<schedule_query::Cursor> = match cursor {
971                Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
972                None => None,
973            };
974
975            schedule_query::query_schedules(&tx, partitions, cursor, limit, filters, order)
976        })
977        .await
978    }
979
980    async fn query_schedule_ids(
981        &self,
982        filters: ora_storage::ScheduleQueryFilters,
983    ) -> eyre::Result<Vec<Uuid>> {
984        self.read(move |tx, partitions| {
985            schedule_query::query_schedule_ids(&tx, partitions, filters)
986        })
987        .await
988    }
989
990    async fn count_schedules(
991        &self,
992        filters: ora_storage::ScheduleQueryFilters,
993    ) -> eyre::Result<u64> {
994        self.read(move |tx, partitions| schedule_query::count_schedules(&tx, partitions, filters))
995            .await
996    }
997
998    async fn delete_schedules(
999        &self,
1000        mut filters: ora_storage::ScheduleQueryFilters,
1001    ) -> eyre::Result<Vec<Uuid>> {
1002        // Deleting active schedules may cause all sorts of issues,
1003        // so we don't allow it.
1004        //
1005        // The server itself should also set this filter, we're
1006        // just being extra cautious here.
1007        filters.active = Some(false);
1008
1009        let ids_to_delete = self
1010            .read(move |tx, partitions| {
1011                schedule_query::query_schedule_ids(&tx, partitions, filters)
1012            })
1013            .await?;
1014
1015        self.write(move |mut tx, partitions| {
1016            let mut deleted_schedules = Vec::with_capacity(ids_to_delete.len());
1017
1018            for schedule_id in ids_to_delete {
1019                // Delete the schedule itself.
1020                let Some(schedule_data) = partitions
1021                    .inactive_schedules
1022                    .write(&mut tx)
1023                    .take(&schedule_id)?
1024                else {
1025                    continue;
1026                };
1027
1028                deleted_schedules.push(schedule_id);
1029
1030                let schedule_data = schedule_data.value();
1031
1032                // Remove the schedule from indexes.
1033                for (key, value) in schedule_data.labels.iter() {
1034                    let label_key = indexes::LabelIndexKey::new(key, value, schedule_id);
1035                    partitions
1036                        .idx_schedule_labels
1037                        .write(&mut tx)
1038                        .remove(&label_key);
1039                }
1040
1041                // Also cascade delete all jobs associated with the schedule.
1042                let mut job_ids = Vec::new();
1043
1044                for schedule_job in partitions
1045                    .idx_schedule_jobs
1046                    .write(&mut tx)
1047                    .prefix(&indexes::ScheduleJobIndexKey::new_prefix(schedule_id))
1048                {
1049                    let (job_id, _) = schedule_job?;
1050                    let job_id = job_id.value().job_id.unwrap();
1051
1052                    job_ids.push(job_id);
1053                }
1054
1055                for job_id in job_ids {
1056                    delete_job(partitions, &mut tx, job_id)?;
1057                }
1058            }
1059
1060            tx.commit()?;
1061
1062            Ok(deleted_schedules)
1063        })
1064        .await
1065    }
1066
1067    async fn job_added_conditionally(
1068        &self,
1069        _job: ora_storage::NewJob,
1070        _filters: ora_storage::JobQueryFilters,
1071    ) -> eyre::Result<ora_storage::ConditionalJobResult> {
1072        bail!("not supported")
1073    }
1074
1075    async fn schedule_added_conditionally(
1076        &self,
1077        _schedule: ora_storage::NewSchedule,
1078        _filters: ora_storage::ScheduleQueryFilters,
1079    ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
1080        bail!("not supported")
1081    }
1082}
1083
1084fn delete_job<'a>(
1085    partitions: &'a Partitions,
1086    tx: &mut WriteTransaction<'a>,
1087    job_id: Uuid,
1088) -> Result<bool, eyre::Error> {
1089    let Some(job_data) = partitions.inactive_jobs.write(tx).take(&job_id)? else {
1090        return Ok(false);
1091    };
1092    let job_data = job_data.value();
1093    let mut execution_ids = Vec::new();
1094    for job_execution in partitions
1095        .idx_job_executions
1096        .write(tx)
1097        .prefix(&indexes::JobExecutionIndexKey::new_prefix(job_id))
1098    {
1099        let (job_execution_id, _) = job_execution?;
1100        let job_execution_id = job_execution_id.value().execution_id.unwrap();
1101
1102        execution_ids.push(job_execution_id);
1103    }
1104    for job_execution_id in execution_ids {
1105        partitions
1106            .succeeded_executions
1107            .write(tx)
1108            .remove(&job_execution_id);
1109        partitions
1110            .failed_executions
1111            .write(tx)
1112            .remove(&job_execution_id);
1113
1114        partitions
1115            .idx_job_executions
1116            .write(tx)
1117            .remove(&indexes::JobExecutionIndexKey::new(
1118                job_id,
1119                job_execution_id,
1120            ));
1121    }
1122    for (key, value) in job_data.labels.iter() {
1123        let label_key = indexes::LabelIndexKey::new(key, value, job_id);
1124        partitions.idx_job_labels.write(tx).remove(&label_key);
1125    }
1126
1127    if let Some(schedule_id) = partitions.idx_job_schedule.write(tx).get(&job_id)? {
1128        partitions
1129            .idx_schedule_jobs
1130            .write(tx)
1131            .remove(&indexes::ScheduleJobIndexKey::new(
1132                schedule_id.value(),
1133                job_id,
1134            ));
1135        partitions.idx_job_schedule.write(tx).remove(&job_id);
1136    }
1137
1138    Ok(true)
1139}
1140
1141fn job_unschedulable<'a>(
1142    partitions: &'a Partitions,
1143    tx: &mut WriteTransaction<'a>,
1144    job_id: Uuid,
1145    timestamp: SystemTime,
1146) -> Result<(), eyre::Error> {
1147    if let Some(job_data) = partitions.active_jobs.write(tx).take(&job_id)? {
1148        let mut job_data = deserialize!(job_data.value())?;
1149
1150        if job_data.marked_unschedulable_at.is_none() {
1151            job_data.marked_unschedulable_at = Some(timestamp);
1152        }
1153
1154        if partitions
1155            .idx_job_active_execution
1156            .write(tx)
1157            .contains_key(&job_id)?
1158        {
1159            // We cannot mark the job inactive while it has active executions,
1160            // so we keep it in the active state but mark it unschedulable.
1161            partitions.active_jobs.write(tx).insert(&job_id, &job_data);
1162        } else {
1163            partitions
1164                .inactive_jobs
1165                .write(tx)
1166                .insert(&job_id, &job_data);
1167
1168            if let Some(schedule_id) = job_data.schedule_id {
1169                partitions
1170                    .idx_schedule_active_job
1171                    .write(tx)
1172                    .remove(&schedule_id);
1173
1174                if let Some(schedule_data) =
1175                    partitions.active_schedules.write(tx).get(&schedule_id)?
1176                {
1177                    let schedule_data = schedule_data.value();
1178
1179                    // Last job of the schedule is done, unless the schedule
1180                    // was cancelled, we mark it as pending again.
1181                    if schedule_data.cancelled_at.is_none() {
1182                        partitions
1183                            .idx_pending_schedules
1184                            .write(tx)
1185                            .insert(&schedule_id, &());
1186                    }
1187                }
1188            }
1189        }
1190
1191        partitions.idx_pending_jobs.write(tx).remove(&job_id);
1192    }
1193    Ok(())
1194}
1195
1196fn schedule_unschedulable<'a>(
1197    partitions: &'a Partitions,
1198    tx: &mut WriteTransaction<'a>,
1199    schedule_id: Uuid,
1200    timestamp: SystemTime,
1201) -> Result<(), eyre::Error> {
1202    if let Some(schedule_data) = partitions.active_schedules.write(tx).take(&schedule_id)? {
1203        let mut schedule_data = deserialize!(schedule_data.value())?;
1204
1205        if schedule_data.marked_unschedulable_at.is_none() {
1206            schedule_data.marked_unschedulable_at = Some(timestamp);
1207        }
1208
1209        if partitions
1210            .idx_schedule_active_job
1211            .write(tx)
1212            .contains_key(&schedule_id)?
1213        {
1214            // We cannot mark the schedule inactive while it has active jobs,
1215            // so we keep it in the active state but mark it unschedulable.
1216            partitions
1217                .active_schedules
1218                .write(tx)
1219                .insert(&schedule_id, &schedule_data);
1220        } else {
1221            partitions
1222                .inactive_schedules
1223                .write(tx)
1224                .insert(&schedule_id, &schedule_data);
1225        }
1226
1227        partitions
1228            .idx_pending_schedules
1229            .write(tx)
1230            .remove(&schedule_id);
1231    }
1232    Ok(())
1233}