ora_storage_memory/
lib.rs

1//! A simple in-memory storage backend with no persistence.
2
3use std::{sync::Arc, time::SystemTime};
4
5use async_trait::async_trait;
6use eyre::{bail, Context};
7use indexmap::IndexSet;
8use parking_lot::RwLock;
9use uuid::Uuid;
10
11use ora_storage::{
12    CancelledJob, CancelledSchedule, ExecutionDetails, IndexMap, JobExecutionStatus,
13    JobQueryFilters, JobQueryOrder, JobQueryResult, JobRetryPolicy, JobTimeoutPolicy, JobType,
14    NewExecution, NewJob, NewSchedule, PendingExecution, PendingJob, PendingSchedule,
15    ReadyExecution, ScheduleJobCreationPolicy, ScheduleJobTimingPolicy, ScheduleQueryFilters,
16    ScheduleQueryOrder, ScheduleQueryResult, ScheduleTimeRange, Storage,
17};
18
19mod job_query;
20mod schedule_query;
21mod snapshot;
22
23/// A storage backend that holds all data in memory.
24///
25/// It serves as a reference implementation for storage backends
26/// as well as a test platform for the server itself.
27/// It is optimized for simplicity and readability, and is not
28/// intended for production use.
29///
30/// It may be used for small-scale production applications
31/// where persistence is not required and the amount of data
32/// is small enough to fit in memory.
33//
34// Indexes and other helper data structures are omitted
35// on purpose to keep the implementation simple.
36//
37// The implementation also contains more safety checks and
38// assertions that test the server itself.
39#[derive(Debug, Default, Clone)]
40#[must_use]
41pub struct MemoryStorage {
42    job_types: Arc<RwLock<IndexMap<String, JobType>>>,
43
44    // Jobs are partitioned by whether they are schedulable or unschedulable.
45    schedulable_jobs: Arc<RwLock<IndexMap<Uuid, Job>>>,
46    unschedulable_jobs: Arc<RwLock<IndexMap<Uuid, Job>>>,
47
48    // Executions are partitioned by the phase they are in.
49    pending_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
50    ready_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
51    assigned_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
52    started_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
53    succeeded_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
54    failed_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
55
56    // Schedules are partitioned by whether they are schedulable or unschedulable.
57    schedulable_schedules: Arc<RwLock<IndexMap<Uuid, Schedule>>>,
58    unschedulable_schedules: Arc<RwLock<IndexMap<Uuid, Schedule>>>,
59}
60
61impl MemoryStorage {
62    /// Create a new in-memory storage backend.
63    pub fn new() -> Self {
64        Self::default()
65    }
66}
67
68#[async_trait]
69impl Storage for MemoryStorage {
70    async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()> {
71        self.job_types.write().extend(
72            job_types
73                .iter()
74                .map(|job_type| (job_type.id.clone(), job_type.clone())),
75        );
76
77        Ok(())
78    }
79
80    async fn jobs_added(&self, new_jobs: Vec<NewJob>) -> eyre::Result<()> {
81        for new_job in new_jobs {
82            let mut jobs = self.schedulable_jobs.write();
83
84            if jobs.contains_key(&new_job.id) {
85                bail!("job with ID {} already exists", new_job.id);
86            }
87
88            jobs.insert(new_job.id, Job::from(new_job));
89        }
90
91        Ok(())
92    }
93
94    async fn job_added_conditionally(
95        &self,
96        _job: NewJob,
97        _filters: JobQueryFilters,
98    ) -> eyre::Result<ora_storage::ConditionalJobResult> {
99        // FIXME: we'd need to add a lock to make this atomic.
100        bail!("not supported");
101    }
102
103    async fn jobs_cancelled(
104        &self,
105        job_ids: &[Uuid],
106        timestamp: SystemTime,
107    ) -> eyre::Result<Vec<CancelledJob>> {
108        let mut cancelled_jobs = Vec::with_capacity(job_ids.len());
109
110        for job_id in job_ids {
111            let active_job = self.schedulable_jobs.write().swap_remove(job_id);
112
113            if let Some(mut job) = active_job {
114                debug_assert!(job.cancelled_at.is_none());
115                job.cancelled_at = Some(timestamp);
116
117                let active_execution = self
118                    .pending_executions
119                    .read()
120                    .iter()
121                    .find_map(|(_, execution)| {
122                        if &execution.job_id == job_id {
123                            Some(execution.id)
124                        } else {
125                            None
126                        }
127                    })
128                    .or_else(|| {
129                        self.ready_executions
130                            .read()
131                            .iter()
132                            .find_map(|(_, execution)| {
133                                if &execution.job_id == job_id {
134                                    Some(execution.id)
135                                } else {
136                                    None
137                                }
138                            })
139                    })
140                    .or_else(|| {
141                        self.assigned_executions
142                            .read()
143                            .iter()
144                            .find_map(|(_, execution)| {
145                                if &execution.job_id == job_id {
146                                    Some(execution.id)
147                                } else {
148                                    None
149                                }
150                            })
151                    })
152                    .or_else(|| {
153                        self.started_executions
154                            .read()
155                            .iter()
156                            .find_map(|(_, execution)| {
157                                if &execution.job_id == job_id {
158                                    Some(execution.id)
159                                } else {
160                                    None
161                                }
162                            })
163                    });
164
165                if active_execution.is_some() {
166                    job.marked_unschedulable_at = Some(timestamp);
167                    self.unschedulable_jobs.write().insert(*job_id, job);
168                } else {
169                    self.schedulable_jobs.write().insert(*job_id, job);
170                }
171
172                cancelled_jobs.push(CancelledJob {
173                    id: *job_id,
174                    active_execution,
175                });
176            }
177        }
178
179        Ok(cancelled_jobs)
180    }
181
182    async fn executions_added(
183        &self,
184        executions: Vec<NewExecution>,
185        timestamp: SystemTime,
186    ) -> eyre::Result<()> {
187        for execution in executions {
188            let mut pending_executions = self.pending_executions.write();
189            if pending_executions.contains_key(&execution.id) {
190                bail!("execution with ID {} already exists", execution.id);
191            }
192
193            pending_executions.insert(
194                execution.id,
195                Execution {
196                    id: execution.id,
197                    job_id: execution.job_id,
198                    target_execution_time: execution.target_execution_time,
199                    created_at: timestamp,
200                    executor_id: None,
201                    ready_at: None,
202                    assigned_at: None,
203                    started_at: None,
204                    succeeded_at: None,
205                    failed_at: None,
206                    output_payload_json: None,
207                    failure_reason: None,
208                },
209            );
210        }
211
212        Ok(())
213    }
214
215    async fn executions_ready(
216        &self,
217        execution_ids: &[Uuid],
218        timestamp: SystemTime,
219    ) -> eyre::Result<()> {
220        for execution_id in execution_ids {
221            let execution = self.pending_executions.write().swap_remove(execution_id);
222
223            if let Some(mut execution) = execution {
224                debug_assert!(execution.ready_at.is_none());
225                execution.ready_at = Some(timestamp);
226                self.ready_executions
227                    .write()
228                    .insert(*execution_id, execution);
229            }
230        }
231
232        Ok(())
233    }
234
235    async fn execution_assigned(
236        &self,
237        execution_id: Uuid,
238        executor_id: Uuid,
239        timestamp: SystemTime,
240    ) -> eyre::Result<()> {
241        let execution = self.ready_executions.write().swap_remove(&execution_id);
242
243        if let Some(mut execution) = execution {
244            debug_assert!(execution.assigned_at.is_none());
245            execution.assigned_at = Some(timestamp);
246            execution.executor_id = Some(executor_id);
247
248            self.assigned_executions
249                .write()
250                .insert(execution_id, execution);
251        }
252
253        Ok(())
254    }
255
256    async fn execution_started(
257        &self,
258        execution_id: Uuid,
259        timestamp: SystemTime,
260    ) -> eyre::Result<()> {
261        let execution = self.assigned_executions.write().swap_remove(&execution_id);
262        if let Some(mut execution) = execution {
263            debug_assert!(execution.started_at.is_none());
264            execution.started_at = Some(timestamp);
265
266            self.started_executions
267                .write()
268                .insert(execution_id, execution);
269        }
270
271        Ok(())
272    }
273
274    async fn execution_succeeded(
275        &self,
276        execution_id: Uuid,
277        timestamp: SystemTime,
278        output_payload_json: String,
279    ) -> eyre::Result<()> {
280        // executions may succeed at any phase, so we need to check all phases
281        let mut execution = if let Some(execution) =
282            self.pending_executions.write().swap_remove(&execution_id)
283        {
284            execution
285        } else if let Some(execution) = self.ready_executions.write().swap_remove(&execution_id) {
286            execution
287        } else if let Some(execution) = self.assigned_executions.write().swap_remove(&execution_id)
288        {
289            execution
290        } else if let Some(execution) = self.started_executions.write().swap_remove(&execution_id) {
291            execution
292        } else {
293            return Ok(());
294        };
295
296        debug_assert!(execution.succeeded_at.is_none());
297        debug_assert!(execution.failed_at.is_none());
298
299        execution.succeeded_at = Some(timestamp);
300        execution.output_payload_json = Some(output_payload_json);
301
302        let job = self.schedulable_jobs.write().swap_remove(&execution.job_id);
303        if let Some(mut job) = job {
304            debug_assert!(job.marked_unschedulable_at.is_none());
305            job.marked_unschedulable_at = Some(timestamp);
306
307            self.unschedulable_jobs
308                .write()
309                .insert(execution.job_id, job);
310        }
311
312        self.succeeded_executions
313            .write()
314            .insert(execution_id, execution);
315
316        Ok(())
317    }
318
319    async fn executions_failed(
320        &self,
321        execution_ids: &[Uuid],
322        timestamp: SystemTime,
323        reason: String,
324        mark_job_inactive: bool,
325    ) -> eyre::Result<()> {
326        // executions may fail at any phase, so we need to check all phases
327        for execution_id in execution_ids {
328            let mut execution = if let Some(execution) =
329                self.pending_executions.write().swap_remove(execution_id)
330            {
331                execution
332            } else if let Some(execution) = self.ready_executions.write().swap_remove(execution_id)
333            {
334                execution
335            } else if let Some(execution) =
336                self.assigned_executions.write().swap_remove(execution_id)
337            {
338                execution
339            } else if let Some(execution) =
340                self.started_executions.write().swap_remove(execution_id)
341            {
342                execution
343            } else {
344                return Ok(());
345            };
346
347            debug_assert!(execution.succeeded_at.is_none());
348            debug_assert!(execution.failed_at.is_none());
349
350            execution.failed_at = Some(timestamp);
351            execution.failure_reason = Some(reason.clone());
352
353            if mark_job_inactive {
354                let job = self.schedulable_jobs.write().swap_remove(&execution.job_id);
355                if let Some(mut job) = job {
356                    debug_assert!(job.marked_unschedulable_at.is_none());
357                    job.marked_unschedulable_at = Some(timestamp);
358
359                    self.unschedulable_jobs
360                        .write()
361                        .insert(execution.job_id, job);
362                }
363            }
364
365            self.failed_executions
366                .write()
367                .insert(*execution_id, execution);
368        }
369
370        Ok(())
371    }
372
373    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
374        Ok(self
375            .assigned_executions
376            .read()
377            .iter()
378            .filter_map(|(id, execution)| {
379                if executor_ids.contains(&execution.executor_id.unwrap()) {
380                    None
381                } else {
382                    Some(*id)
383                }
384            })
385            .chain(
386                self.started_executions
387                    .read()
388                    .iter()
389                    .filter_map(|(id, execution)| {
390                        if executor_ids.contains(&execution.executor_id.unwrap()) {
391                            None
392                        } else {
393                            Some(*id)
394                        }
395                    }),
396            )
397            .collect())
398    }
399
400    async fn jobs_unschedulable(
401        &self,
402        job_ids: &[Uuid],
403        timestamp: SystemTime,
404    ) -> eyre::Result<()> {
405        for job_id in job_ids {
406            let job = self.schedulable_jobs.write().swap_remove(job_id);
407            if let Some(mut job) = job {
408                debug_assert!(job.marked_unschedulable_at.is_none());
409                job.marked_unschedulable_at = Some(timestamp);
410
411                self.unschedulable_jobs.write().insert(*job_id, job);
412            } else {
413                debug_assert!(false, "active job with ID {job_id} not found");
414            }
415        }
416
417        Ok(())
418    }
419
420    async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>> {
421        Ok(self
422            .pending_executions
423            .read()
424            .iter()
425            .filter_map(|(id, execution)| {
426                if let Some(after) = after {
427                    if *id <= after {
428                        return None;
429                    }
430                }
431
432                Some(PendingExecution {
433                    id: *id,
434                    target_execution_time: execution.target_execution_time,
435                })
436            })
437            .collect())
438    }
439
440    async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>> {
441        Ok({
442            let mut executions = self
443                .ready_executions
444                .read()
445                .iter()
446                .filter_map(|(id, execution)| {
447                    let jobs = self.schedulable_jobs.read();
448                    let Some(job) = jobs.get(&execution.job_id) else {
449                        debug_assert!(false, "active job with ID {} not found", execution.job_id);
450                        return None;
451                    };
452
453                    Some(ReadyExecution {
454                        id: *id,
455                        target_execution_time: execution.target_execution_time,
456                        job_id: execution.job_id,
457                        input_payload_json: job.input_payload_json.clone(),
458                        attempt_number: 0,
459                        job_type_id: job.job_type_id.clone(),
460                        timeout_policy: job.timeout_policy,
461                    })
462                })
463                .collect::<Vec<_>>();
464
465            executions.sort_by_key(|execution| execution.id);
466
467            if let Some(after) = after {
468                executions.retain(|execution| execution.id > after);
469            }
470
471            for execution in &mut executions {
472                execution.attempt_number = u64::try_from(
473                    self.executions_by_job_id(execution.job_id)
474                        .position(|id| id == execution.id)
475                        .unwrap(),
476                )
477                .unwrap()
478                    + 1;
479            }
480
481            executions
482        })
483    }
484
485    async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>> {
486        let mut jobs: Vec<PendingJob> = {
487            let pending_executions = self.pending_executions.read();
488            let ready_executions = self.ready_executions.read();
489            let assigned_executions = self.assigned_executions.read();
490            let started_executions = self.started_executions.read();
491
492            self.schedulable_jobs
493                .read()
494                .iter()
495                .filter_map(|(job_id, job)| {
496                    if pending_executions
497                        .values()
498                        .any(|execution| execution.job_id == *job_id)
499                    {
500                        return None;
501                    }
502
503                    if ready_executions
504                        .values()
505                        .any(|execution| execution.job_id == *job_id)
506                    {
507                        return None;
508                    }
509
510                    if assigned_executions
511                        .values()
512                        .any(|execution| execution.job_id == *job_id)
513                    {
514                        return None;
515                    }
516
517                    if started_executions
518                        .values()
519                        .any(|execution| execution.job_id == *job_id)
520                    {
521                        return None;
522                    }
523
524                    if let Some(after) = after {
525                        if job.id <= after {
526                            return None;
527                        }
528                    }
529
530                    Some(PendingJob {
531                        id: job.id,
532                        target_execution_time: job.target_execution_time,
533                        execution_count: 0,
534                        retry_policy: job.retry_policy,
535                        timeout_policy: job.timeout_policy,
536                    })
537                })
538                .collect::<Vec<_>>()
539        };
540
541        for job in &mut jobs {
542            job.execution_count = u64::try_from(self.executions_by_job_id(job.id).len()).unwrap();
543        }
544
545        Ok(jobs)
546    }
547
548    async fn query_jobs(
549        &self,
550        cursor: Option<String>,
551        limit: usize,
552        order: JobQueryOrder,
553        filters: JobQueryFilters,
554    ) -> eyre::Result<JobQueryResult> {
555        let cursor: Option<job_query::Cursor> = match cursor {
556            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
557            None => None,
558        };
559
560        Ok(self.query_jobs_impl(cursor, limit, order, filters))
561    }
562
563    async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
564        Ok(self.query_job_ids_impl(filters))
565    }
566
567    async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64> {
568        Ok(self.count_jobs_impl(filters))
569    }
570
571    async fn query_job_types(&self) -> eyre::Result<Vec<JobType>> {
572        Ok(self.job_types.read().values().cloned().collect())
573    }
574
575    async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
576        let job_ids = self.query_job_ids_impl(filters);
577
578        let mut executions_to_remove = Vec::new();
579
580        for job_id in &job_ids {
581            self.unschedulable_jobs
582                .write()
583                .swap_remove(job_id)
584                .or_else(|| self.schedulable_jobs.write().swap_remove(job_id));
585
586            executions_to_remove.extend(self.executions_by_job_id(*job_id));
587        }
588
589        self.pending_executions
590            .write()
591            .retain(|id, _| !executions_to_remove.contains(id));
592        self.ready_executions
593            .write()
594            .retain(|id, _| !executions_to_remove.contains(id));
595        self.assigned_executions
596            .write()
597            .retain(|id, _| !executions_to_remove.contains(id));
598        self.started_executions
599            .write()
600            .retain(|id, _| !executions_to_remove.contains(id));
601        self.succeeded_executions
602            .write()
603            .retain(|id, _| !executions_to_remove.contains(id));
604        self.failed_executions
605            .write()
606            .retain(|id, _| !executions_to_remove.contains(id));
607
608        Ok(job_ids)
609    }
610
611    async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()> {
612        for schedule in schedules {
613            let mut active_schedules = self.schedulable_schedules.write();
614
615            if active_schedules.contains_key(&schedule.id) {
616                bail!("schedule with ID {} already exists", schedule.id);
617            }
618
619            active_schedules.insert(schedule.id, Schedule::from(schedule));
620        }
621
622        Ok(())
623    }
624
625    async fn schedule_added_conditionally(
626        &self,
627        _schedule: NewSchedule,
628        _filters: ScheduleQueryFilters,
629    ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
630        // FIXME: we'd need to add a lock to make this atomic.
631        bail!("not supported");
632    }
633
634    async fn schedules_cancelled(
635        &self,
636        schedule_ids: &[Uuid],
637        timestamp: SystemTime,
638    ) -> eyre::Result<Vec<CancelledSchedule>> {
639        let mut cancelled_schedules = Vec::with_capacity(schedule_ids.len());
640
641        for schedule_id in schedule_ids {
642            let schedule = self.schedulable_schedules.write().swap_remove(schedule_id);
643
644            if let Some(mut schedule) = schedule {
645                debug_assert!(schedule.cancelled_at.is_none());
646                schedule.cancelled_at = Some(timestamp);
647
648                debug_assert!(schedule.marked_unschedulable_at.is_none());
649                schedule.marked_unschedulable_at = Some(timestamp);
650
651                let schedule_id = schedule.id;
652
653                self.unschedulable_schedules
654                    .write()
655                    .insert(schedule_id, schedule);
656
657                cancelled_schedules.push(CancelledSchedule { id: schedule_id });
658            }
659        }
660
661        Ok(cancelled_schedules)
662    }
663
664    async fn schedules_unschedulable(
665        &self,
666        schedule_ids: &[Uuid],
667        timestamp: SystemTime,
668    ) -> eyre::Result<()> {
669        for schedule_id in schedule_ids {
670            let schedule = self.schedulable_schedules.write().swap_remove(schedule_id);
671
672            if let Some(mut schedule) = schedule {
673                debug_assert!(schedule.marked_unschedulable_at.is_none());
674                schedule.marked_unschedulable_at = Some(timestamp);
675
676                let schedule_id = schedule.id;
677
678                self.unschedulable_schedules
679                    .write()
680                    .insert(schedule_id, schedule);
681            }
682        }
683
684        Ok(())
685    }
686
687    async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>> {
688        Ok(self
689            .schedulable_schedules
690            .read()
691            .iter()
692            .filter_map(|(id, schedule)| {
693                if self
694                    .schedulable_jobs
695                    .read()
696                    .values()
697                    .any(|job| job.schedule_id == Some(schedule.id))
698                {
699                    return None;
700                }
701
702                if let Some(after) = after {
703                    if *id <= after {
704                        return None;
705                    }
706                }
707
708                Some(PendingSchedule {
709                    id: *id,
710                    job_timing_policy: schedule.job_timing_policy.clone(),
711                    job_creation_policy: schedule.job_creation_policy.clone(),
712                    last_target_execution_time: self.last_target_execution_time(schedule.id),
713                    time_range: schedule.time_range,
714                })
715            })
716            .collect::<Vec<_>>())
717    }
718
719    async fn query_schedules(
720        &self,
721        cursor: Option<String>,
722        limit: usize,
723        filters: ScheduleQueryFilters,
724        order: ScheduleQueryOrder,
725    ) -> eyre::Result<ScheduleQueryResult> {
726        let cursor: Option<schedule_query::Cursor> = match cursor {
727            Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
728            None => None,
729        };
730
731        Ok(self.query_schedules_impl(cursor, limit, order, filters))
732    }
733
734    async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>> {
735        Ok(self.query_schedule_ids_impl(filters))
736    }
737
738    async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64> {
739        Ok(self.count_schedules_impl(filters))
740    }
741
742    async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>> {
743        let schedule_ids = self
744            .query_schedule_ids_impl(filters)
745            .into_iter()
746            .collect::<IndexSet<_>>();
747
748        // We don't care about orphaned jobs here.
749        self.schedulable_schedules
750            .write()
751            .retain(|id, _| !schedule_ids.contains(id));
752        self.unschedulable_schedules
753            .write()
754            .retain(|id, _| !schedule_ids.contains(id));
755
756        Ok(schedule_ids.into_iter().collect())
757    }
758}
759
760impl MemoryStorage {
761    /// Returns all execution IDs for a job in creation order.
762    fn executions_by_job_id(&self, job_id: Uuid) -> impl ExactSizeIterator<Item = Uuid> {
763        let mut execution_ids = self
764            .pending_executions
765            .read()
766            .values()
767            .chain(self.ready_executions.read().values())
768            .chain(self.assigned_executions.read().values())
769            .chain(self.started_executions.read().values())
770            .chain(self.succeeded_executions.read().values())
771            .chain(self.failed_executions.read().values())
772            .filter(move |execution| execution.job_id == job_id)
773            .map(|execution| execution.id)
774            .collect::<Vec<_>>();
775
776        execution_ids.sort_unstable();
777
778        execution_ids.into_iter()
779    }
780
781    /// Returns the last target execution time of a schedule.
782    fn last_target_execution_time(&self, schedule_id: Uuid) -> Option<SystemTime> {
783        self.schedulable_jobs
784            .read()
785            .values()
786            .filter(|job| job.schedule_id == Some(schedule_id))
787            .map(|job| job.target_execution_time)
788            .max()
789            .max(
790                self.unschedulable_jobs
791                    .read()
792                    .values()
793                    .filter(|job| job.schedule_id == Some(schedule_id))
794                    .map(|job| job.target_execution_time)
795                    .max(),
796            )
797    }
798}
799
800#[derive(Debug, Clone)]
801struct Job {
802    id: Uuid,
803    schedule_id: Option<Uuid>,
804    created_at: SystemTime,
805    job_type_id: String,
806    target_execution_time: SystemTime,
807    retry_policy: JobRetryPolicy,
808    timeout_policy: JobTimeoutPolicy,
809    labels: IndexMap<String, String>,
810    marked_unschedulable_at: Option<SystemTime>,
811    cancelled_at: Option<SystemTime>,
812    input_payload_json: String,
813    metadata_json: Option<String>,
814}
815
816impl From<NewJob> for Job {
817    fn from(job: NewJob) -> Self {
818        Self {
819            id: job.id,
820            schedule_id: job.schedule_id,
821            created_at: job.created_at,
822            job_type_id: job.job_type_id,
823            target_execution_time: job.target_execution_time,
824            retry_policy: job.retry_policy,
825            timeout_policy: job.timeout_policy,
826            labels: job.labels,
827            input_payload_json: job.input_payload_json,
828            marked_unschedulable_at: None,
829            cancelled_at: None,
830            metadata_json: job.metadata_json,
831        }
832    }
833}
834
835#[derive(Debug, Clone)]
836struct Execution {
837    id: Uuid,
838    job_id: Uuid,
839    target_execution_time: SystemTime,
840    executor_id: Option<Uuid>,
841    created_at: SystemTime,
842    ready_at: Option<SystemTime>,
843    assigned_at: Option<SystemTime>,
844    started_at: Option<SystemTime>,
845    succeeded_at: Option<SystemTime>,
846    failed_at: Option<SystemTime>,
847    output_payload_json: Option<String>,
848    failure_reason: Option<String>,
849}
850
851impl From<&Execution> for ExecutionDetails {
852    fn from(value: &Execution) -> Self {
853        Self {
854            id: value.id,
855            job_id: value.job_id,
856            executor_id: value.executor_id,
857            status: if value.succeeded_at.is_some() {
858                JobExecutionStatus::Succeeded
859            } else if value.failed_at.is_some() {
860                JobExecutionStatus::Failed
861            } else if value.started_at.is_some() {
862                JobExecutionStatus::Running
863            } else if value.assigned_at.is_some() {
864                JobExecutionStatus::Assigned
865            } else if value.ready_at.is_some() {
866                JobExecutionStatus::Ready
867            } else {
868                JobExecutionStatus::Pending
869            },
870            created_at: value.created_at,
871            ready_at: value.ready_at,
872            assigned_at: value.assigned_at,
873            started_at: value.started_at,
874            succeeded_at: value.succeeded_at,
875            failed_at: value.failed_at,
876            output_payload_json: value.output_payload_json.clone(),
877            failure_reason: value.failure_reason.clone(),
878        }
879    }
880}
881
882#[derive(Debug, Clone)]
883struct Schedule {
884    id: Uuid,
885    created_at: SystemTime,
886    job_type_id: Option<String>,
887    labels: IndexMap<String, String>,
888    marked_unschedulable_at: Option<SystemTime>,
889    cancelled_at: Option<SystemTime>,
890    job_timing_policy: ScheduleJobTimingPolicy,
891    job_creation_policy: ScheduleJobCreationPolicy,
892    time_range: Option<ScheduleTimeRange>,
893    metadata_json: Option<String>,
894}
895
896impl From<NewSchedule> for Schedule {
897    fn from(schedule: NewSchedule) -> Self {
898        Self {
899            id: schedule.id,
900            created_at: schedule.created_at,
901            job_type_id: match &schedule.job_creation_policy {
902                ScheduleJobCreationPolicy::JobDefinition(schedule_new_job_definition) => {
903                    Some(schedule_new_job_definition.job_type_id.clone())
904                }
905            },
906            labels: schedule.labels,
907            marked_unschedulable_at: None,
908            cancelled_at: None,
909            job_timing_policy: schedule.job_timing_policy,
910            job_creation_policy: schedule.job_creation_policy,
911            time_range: schedule.time_range,
912            metadata_json: schedule.metadata_json,
913        }
914    }
915}