1use std::{sync::Arc, time::SystemTime};
4
5use async_trait::async_trait;
6use eyre::{bail, Context};
7use indexmap::IndexSet;
8use parking_lot::RwLock;
9use uuid::Uuid;
10
11use ora_storage::{
12 CancelledJob, CancelledSchedule, ExecutionDetails, IndexMap, JobExecutionStatus,
13 JobQueryFilters, JobQueryOrder, JobQueryResult, JobRetryPolicy, JobTimeoutPolicy, JobType,
14 NewExecution, NewJob, NewSchedule, PendingExecution, PendingJob, PendingSchedule,
15 ReadyExecution, ScheduleJobCreationPolicy, ScheduleJobTimingPolicy, ScheduleQueryFilters,
16 ScheduleQueryOrder, ScheduleQueryResult, ScheduleTimeRange, Storage,
17};
18
19mod job_query;
20mod schedule_query;
21mod snapshot;
22
23#[derive(Debug, Default, Clone)]
40#[must_use]
41pub struct MemoryStorage {
42 job_types: Arc<RwLock<IndexMap<String, JobType>>>,
43
44 schedulable_jobs: Arc<RwLock<IndexMap<Uuid, Job>>>,
46 unschedulable_jobs: Arc<RwLock<IndexMap<Uuid, Job>>>,
47
48 pending_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
50 ready_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
51 assigned_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
52 started_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
53 succeeded_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
54 failed_executions: Arc<RwLock<IndexMap<Uuid, Execution>>>,
55
56 schedulable_schedules: Arc<RwLock<IndexMap<Uuid, Schedule>>>,
58 unschedulable_schedules: Arc<RwLock<IndexMap<Uuid, Schedule>>>,
59}
60
61impl MemoryStorage {
62 pub fn new() -> Self {
64 Self::default()
65 }
66}
67
68#[async_trait]
69impl Storage for MemoryStorage {
70 async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()> {
71 self.job_types.write().extend(
72 job_types
73 .iter()
74 .map(|job_type| (job_type.id.clone(), job_type.clone())),
75 );
76
77 Ok(())
78 }
79
80 async fn jobs_added(&self, new_jobs: Vec<NewJob>) -> eyre::Result<()> {
81 for new_job in new_jobs {
82 let mut jobs = self.schedulable_jobs.write();
83
84 if jobs.contains_key(&new_job.id) {
85 bail!("job with ID {} already exists", new_job.id);
86 }
87
88 jobs.insert(new_job.id, Job::from(new_job));
89 }
90
91 Ok(())
92 }
93
94 async fn job_added_conditionally(
95 &self,
96 _job: NewJob,
97 _filters: JobQueryFilters,
98 ) -> eyre::Result<ora_storage::ConditionalJobResult> {
99 bail!("not supported");
101 }
102
103 async fn jobs_cancelled(
104 &self,
105 job_ids: &[Uuid],
106 timestamp: SystemTime,
107 ) -> eyre::Result<Vec<CancelledJob>> {
108 let mut cancelled_jobs = Vec::with_capacity(job_ids.len());
109
110 for job_id in job_ids {
111 let active_job = self.schedulable_jobs.write().swap_remove(job_id);
112
113 if let Some(mut job) = active_job {
114 debug_assert!(job.cancelled_at.is_none());
115 job.cancelled_at = Some(timestamp);
116
117 let active_execution = self
118 .pending_executions
119 .read()
120 .iter()
121 .find_map(|(_, execution)| {
122 if &execution.job_id == job_id {
123 Some(execution.id)
124 } else {
125 None
126 }
127 })
128 .or_else(|| {
129 self.ready_executions
130 .read()
131 .iter()
132 .find_map(|(_, execution)| {
133 if &execution.job_id == job_id {
134 Some(execution.id)
135 } else {
136 None
137 }
138 })
139 })
140 .or_else(|| {
141 self.assigned_executions
142 .read()
143 .iter()
144 .find_map(|(_, execution)| {
145 if &execution.job_id == job_id {
146 Some(execution.id)
147 } else {
148 None
149 }
150 })
151 })
152 .or_else(|| {
153 self.started_executions
154 .read()
155 .iter()
156 .find_map(|(_, execution)| {
157 if &execution.job_id == job_id {
158 Some(execution.id)
159 } else {
160 None
161 }
162 })
163 });
164
165 if active_execution.is_some() {
166 job.marked_unschedulable_at = Some(timestamp);
167 self.unschedulable_jobs.write().insert(*job_id, job);
168 } else {
169 self.schedulable_jobs.write().insert(*job_id, job);
170 }
171
172 cancelled_jobs.push(CancelledJob {
173 id: *job_id,
174 active_execution,
175 });
176 }
177 }
178
179 Ok(cancelled_jobs)
180 }
181
182 async fn executions_added(
183 &self,
184 executions: Vec<NewExecution>,
185 timestamp: SystemTime,
186 ) -> eyre::Result<()> {
187 for execution in executions {
188 let mut pending_executions = self.pending_executions.write();
189 if pending_executions.contains_key(&execution.id) {
190 bail!("execution with ID {} already exists", execution.id);
191 }
192
193 pending_executions.insert(
194 execution.id,
195 Execution {
196 id: execution.id,
197 job_id: execution.job_id,
198 target_execution_time: execution.target_execution_time,
199 created_at: timestamp,
200 executor_id: None,
201 ready_at: None,
202 assigned_at: None,
203 started_at: None,
204 succeeded_at: None,
205 failed_at: None,
206 output_payload_json: None,
207 failure_reason: None,
208 },
209 );
210 }
211
212 Ok(())
213 }
214
215 async fn executions_ready(
216 &self,
217 execution_ids: &[Uuid],
218 timestamp: SystemTime,
219 ) -> eyre::Result<()> {
220 for execution_id in execution_ids {
221 let execution = self.pending_executions.write().swap_remove(execution_id);
222
223 if let Some(mut execution) = execution {
224 debug_assert!(execution.ready_at.is_none());
225 execution.ready_at = Some(timestamp);
226 self.ready_executions
227 .write()
228 .insert(*execution_id, execution);
229 }
230 }
231
232 Ok(())
233 }
234
235 async fn execution_assigned(
236 &self,
237 execution_id: Uuid,
238 executor_id: Uuid,
239 timestamp: SystemTime,
240 ) -> eyre::Result<()> {
241 let execution = self.ready_executions.write().swap_remove(&execution_id);
242
243 if let Some(mut execution) = execution {
244 debug_assert!(execution.assigned_at.is_none());
245 execution.assigned_at = Some(timestamp);
246 execution.executor_id = Some(executor_id);
247
248 self.assigned_executions
249 .write()
250 .insert(execution_id, execution);
251 }
252
253 Ok(())
254 }
255
256 async fn execution_started(
257 &self,
258 execution_id: Uuid,
259 timestamp: SystemTime,
260 ) -> eyre::Result<()> {
261 let execution = self.assigned_executions.write().swap_remove(&execution_id);
262 if let Some(mut execution) = execution {
263 debug_assert!(execution.started_at.is_none());
264 execution.started_at = Some(timestamp);
265
266 self.started_executions
267 .write()
268 .insert(execution_id, execution);
269 }
270
271 Ok(())
272 }
273
274 async fn execution_succeeded(
275 &self,
276 execution_id: Uuid,
277 timestamp: SystemTime,
278 output_payload_json: String,
279 ) -> eyre::Result<()> {
280 let mut execution = if let Some(execution) =
282 self.pending_executions.write().swap_remove(&execution_id)
283 {
284 execution
285 } else if let Some(execution) = self.ready_executions.write().swap_remove(&execution_id) {
286 execution
287 } else if let Some(execution) = self.assigned_executions.write().swap_remove(&execution_id)
288 {
289 execution
290 } else if let Some(execution) = self.started_executions.write().swap_remove(&execution_id) {
291 execution
292 } else {
293 return Ok(());
294 };
295
296 debug_assert!(execution.succeeded_at.is_none());
297 debug_assert!(execution.failed_at.is_none());
298
299 execution.succeeded_at = Some(timestamp);
300 execution.output_payload_json = Some(output_payload_json);
301
302 let job = self.schedulable_jobs.write().swap_remove(&execution.job_id);
303 if let Some(mut job) = job {
304 debug_assert!(job.marked_unschedulable_at.is_none());
305 job.marked_unschedulable_at = Some(timestamp);
306
307 self.unschedulable_jobs
308 .write()
309 .insert(execution.job_id, job);
310 }
311
312 self.succeeded_executions
313 .write()
314 .insert(execution_id, execution);
315
316 Ok(())
317 }
318
319 async fn executions_failed(
320 &self,
321 execution_ids: &[Uuid],
322 timestamp: SystemTime,
323 reason: String,
324 mark_job_inactive: bool,
325 ) -> eyre::Result<()> {
326 for execution_id in execution_ids {
328 let mut execution = if let Some(execution) =
329 self.pending_executions.write().swap_remove(execution_id)
330 {
331 execution
332 } else if let Some(execution) = self.ready_executions.write().swap_remove(execution_id)
333 {
334 execution
335 } else if let Some(execution) =
336 self.assigned_executions.write().swap_remove(execution_id)
337 {
338 execution
339 } else if let Some(execution) =
340 self.started_executions.write().swap_remove(execution_id)
341 {
342 execution
343 } else {
344 return Ok(());
345 };
346
347 debug_assert!(execution.succeeded_at.is_none());
348 debug_assert!(execution.failed_at.is_none());
349
350 execution.failed_at = Some(timestamp);
351 execution.failure_reason = Some(reason.clone());
352
353 if mark_job_inactive {
354 let job = self.schedulable_jobs.write().swap_remove(&execution.job_id);
355 if let Some(mut job) = job {
356 debug_assert!(job.marked_unschedulable_at.is_none());
357 job.marked_unschedulable_at = Some(timestamp);
358
359 self.unschedulable_jobs
360 .write()
361 .insert(execution.job_id, job);
362 }
363 }
364
365 self.failed_executions
366 .write()
367 .insert(*execution_id, execution);
368 }
369
370 Ok(())
371 }
372
373 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>> {
374 Ok(self
375 .assigned_executions
376 .read()
377 .iter()
378 .filter_map(|(id, execution)| {
379 if executor_ids.contains(&execution.executor_id.unwrap()) {
380 None
381 } else {
382 Some(*id)
383 }
384 })
385 .chain(
386 self.started_executions
387 .read()
388 .iter()
389 .filter_map(|(id, execution)| {
390 if executor_ids.contains(&execution.executor_id.unwrap()) {
391 None
392 } else {
393 Some(*id)
394 }
395 }),
396 )
397 .collect())
398 }
399
400 async fn jobs_unschedulable(
401 &self,
402 job_ids: &[Uuid],
403 timestamp: SystemTime,
404 ) -> eyre::Result<()> {
405 for job_id in job_ids {
406 let job = self.schedulable_jobs.write().swap_remove(job_id);
407 if let Some(mut job) = job {
408 debug_assert!(job.marked_unschedulable_at.is_none());
409 job.marked_unschedulable_at = Some(timestamp);
410
411 self.unschedulable_jobs.write().insert(*job_id, job);
412 } else {
413 debug_assert!(false, "active job with ID {job_id} not found");
414 }
415 }
416
417 Ok(())
418 }
419
420 async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>> {
421 Ok(self
422 .pending_executions
423 .read()
424 .iter()
425 .filter_map(|(id, execution)| {
426 if let Some(after) = after {
427 if *id <= after {
428 return None;
429 }
430 }
431
432 Some(PendingExecution {
433 id: *id,
434 target_execution_time: execution.target_execution_time,
435 })
436 })
437 .collect())
438 }
439
440 async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>> {
441 Ok({
442 let mut executions = self
443 .ready_executions
444 .read()
445 .iter()
446 .filter_map(|(id, execution)| {
447 let jobs = self.schedulable_jobs.read();
448 let Some(job) = jobs.get(&execution.job_id) else {
449 debug_assert!(false, "active job with ID {} not found", execution.job_id);
450 return None;
451 };
452
453 Some(ReadyExecution {
454 id: *id,
455 target_execution_time: execution.target_execution_time,
456 job_id: execution.job_id,
457 input_payload_json: job.input_payload_json.clone(),
458 attempt_number: 0,
459 job_type_id: job.job_type_id.clone(),
460 timeout_policy: job.timeout_policy,
461 })
462 })
463 .collect::<Vec<_>>();
464
465 executions.sort_by_key(|execution| execution.id);
466
467 if let Some(after) = after {
468 executions.retain(|execution| execution.id > after);
469 }
470
471 for execution in &mut executions {
472 execution.attempt_number = u64::try_from(
473 self.executions_by_job_id(execution.job_id)
474 .position(|id| id == execution.id)
475 .unwrap(),
476 )
477 .unwrap()
478 + 1;
479 }
480
481 executions
482 })
483 }
484
485 async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>> {
486 let mut jobs: Vec<PendingJob> = {
487 let pending_executions = self.pending_executions.read();
488 let ready_executions = self.ready_executions.read();
489 let assigned_executions = self.assigned_executions.read();
490 let started_executions = self.started_executions.read();
491
492 self.schedulable_jobs
493 .read()
494 .iter()
495 .filter_map(|(job_id, job)| {
496 if pending_executions
497 .values()
498 .any(|execution| execution.job_id == *job_id)
499 {
500 return None;
501 }
502
503 if ready_executions
504 .values()
505 .any(|execution| execution.job_id == *job_id)
506 {
507 return None;
508 }
509
510 if assigned_executions
511 .values()
512 .any(|execution| execution.job_id == *job_id)
513 {
514 return None;
515 }
516
517 if started_executions
518 .values()
519 .any(|execution| execution.job_id == *job_id)
520 {
521 return None;
522 }
523
524 if let Some(after) = after {
525 if job.id <= after {
526 return None;
527 }
528 }
529
530 Some(PendingJob {
531 id: job.id,
532 target_execution_time: job.target_execution_time,
533 execution_count: 0,
534 retry_policy: job.retry_policy,
535 timeout_policy: job.timeout_policy,
536 })
537 })
538 .collect::<Vec<_>>()
539 };
540
541 for job in &mut jobs {
542 job.execution_count = u64::try_from(self.executions_by_job_id(job.id).len()).unwrap();
543 }
544
545 Ok(jobs)
546 }
547
548 async fn query_jobs(
549 &self,
550 cursor: Option<String>,
551 limit: usize,
552 order: JobQueryOrder,
553 filters: JobQueryFilters,
554 ) -> eyre::Result<JobQueryResult> {
555 let cursor: Option<job_query::Cursor> = match cursor {
556 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
557 None => None,
558 };
559
560 Ok(self.query_jobs_impl(cursor, limit, order, filters))
561 }
562
563 async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
564 Ok(self.query_job_ids_impl(filters))
565 }
566
567 async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64> {
568 Ok(self.count_jobs_impl(filters))
569 }
570
571 async fn query_job_types(&self) -> eyre::Result<Vec<JobType>> {
572 Ok(self.job_types.read().values().cloned().collect())
573 }
574
575 async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>> {
576 let job_ids = self.query_job_ids_impl(filters);
577
578 let mut executions_to_remove = Vec::new();
579
580 for job_id in &job_ids {
581 self.unschedulable_jobs
582 .write()
583 .swap_remove(job_id)
584 .or_else(|| self.schedulable_jobs.write().swap_remove(job_id));
585
586 executions_to_remove.extend(self.executions_by_job_id(*job_id));
587 }
588
589 self.pending_executions
590 .write()
591 .retain(|id, _| !executions_to_remove.contains(id));
592 self.ready_executions
593 .write()
594 .retain(|id, _| !executions_to_remove.contains(id));
595 self.assigned_executions
596 .write()
597 .retain(|id, _| !executions_to_remove.contains(id));
598 self.started_executions
599 .write()
600 .retain(|id, _| !executions_to_remove.contains(id));
601 self.succeeded_executions
602 .write()
603 .retain(|id, _| !executions_to_remove.contains(id));
604 self.failed_executions
605 .write()
606 .retain(|id, _| !executions_to_remove.contains(id));
607
608 Ok(job_ids)
609 }
610
611 async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()> {
612 for schedule in schedules {
613 let mut active_schedules = self.schedulable_schedules.write();
614
615 if active_schedules.contains_key(&schedule.id) {
616 bail!("schedule with ID {} already exists", schedule.id);
617 }
618
619 active_schedules.insert(schedule.id, Schedule::from(schedule));
620 }
621
622 Ok(())
623 }
624
625 async fn schedule_added_conditionally(
626 &self,
627 _schedule: NewSchedule,
628 _filters: ScheduleQueryFilters,
629 ) -> eyre::Result<ora_storage::ConditionalScheduleResult> {
630 bail!("not supported");
632 }
633
634 async fn schedules_cancelled(
635 &self,
636 schedule_ids: &[Uuid],
637 timestamp: SystemTime,
638 ) -> eyre::Result<Vec<CancelledSchedule>> {
639 let mut cancelled_schedules = Vec::with_capacity(schedule_ids.len());
640
641 for schedule_id in schedule_ids {
642 let schedule = self.schedulable_schedules.write().swap_remove(schedule_id);
643
644 if let Some(mut schedule) = schedule {
645 debug_assert!(schedule.cancelled_at.is_none());
646 schedule.cancelled_at = Some(timestamp);
647
648 debug_assert!(schedule.marked_unschedulable_at.is_none());
649 schedule.marked_unschedulable_at = Some(timestamp);
650
651 let schedule_id = schedule.id;
652
653 self.unschedulable_schedules
654 .write()
655 .insert(schedule_id, schedule);
656
657 cancelled_schedules.push(CancelledSchedule { id: schedule_id });
658 }
659 }
660
661 Ok(cancelled_schedules)
662 }
663
664 async fn schedules_unschedulable(
665 &self,
666 schedule_ids: &[Uuid],
667 timestamp: SystemTime,
668 ) -> eyre::Result<()> {
669 for schedule_id in schedule_ids {
670 let schedule = self.schedulable_schedules.write().swap_remove(schedule_id);
671
672 if let Some(mut schedule) = schedule {
673 debug_assert!(schedule.marked_unschedulable_at.is_none());
674 schedule.marked_unschedulable_at = Some(timestamp);
675
676 let schedule_id = schedule.id;
677
678 self.unschedulable_schedules
679 .write()
680 .insert(schedule_id, schedule);
681 }
682 }
683
684 Ok(())
685 }
686
687 async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>> {
688 Ok(self
689 .schedulable_schedules
690 .read()
691 .iter()
692 .filter_map(|(id, schedule)| {
693 if self
694 .schedulable_jobs
695 .read()
696 .values()
697 .any(|job| job.schedule_id == Some(schedule.id))
698 {
699 return None;
700 }
701
702 if let Some(after) = after {
703 if *id <= after {
704 return None;
705 }
706 }
707
708 Some(PendingSchedule {
709 id: *id,
710 job_timing_policy: schedule.job_timing_policy.clone(),
711 job_creation_policy: schedule.job_creation_policy.clone(),
712 last_target_execution_time: self.last_target_execution_time(schedule.id),
713 time_range: schedule.time_range,
714 })
715 })
716 .collect::<Vec<_>>())
717 }
718
719 async fn query_schedules(
720 &self,
721 cursor: Option<String>,
722 limit: usize,
723 filters: ScheduleQueryFilters,
724 order: ScheduleQueryOrder,
725 ) -> eyre::Result<ScheduleQueryResult> {
726 let cursor: Option<schedule_query::Cursor> = match cursor {
727 Some(cursor) => serde_json::from_str(&cursor).wrap_err("invalid cursor")?,
728 None => None,
729 };
730
731 Ok(self.query_schedules_impl(cursor, limit, order, filters))
732 }
733
734 async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>> {
735 Ok(self.query_schedule_ids_impl(filters))
736 }
737
738 async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64> {
739 Ok(self.count_schedules_impl(filters))
740 }
741
742 async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>> {
743 let schedule_ids = self
744 .query_schedule_ids_impl(filters)
745 .into_iter()
746 .collect::<IndexSet<_>>();
747
748 self.schedulable_schedules
750 .write()
751 .retain(|id, _| !schedule_ids.contains(id));
752 self.unschedulable_schedules
753 .write()
754 .retain(|id, _| !schedule_ids.contains(id));
755
756 Ok(schedule_ids.into_iter().collect())
757 }
758}
759
760impl MemoryStorage {
761 fn executions_by_job_id(&self, job_id: Uuid) -> impl ExactSizeIterator<Item = Uuid> {
763 let mut execution_ids = self
764 .pending_executions
765 .read()
766 .values()
767 .chain(self.ready_executions.read().values())
768 .chain(self.assigned_executions.read().values())
769 .chain(self.started_executions.read().values())
770 .chain(self.succeeded_executions.read().values())
771 .chain(self.failed_executions.read().values())
772 .filter(move |execution| execution.job_id == job_id)
773 .map(|execution| execution.id)
774 .collect::<Vec<_>>();
775
776 execution_ids.sort_unstable();
777
778 execution_ids.into_iter()
779 }
780
781 fn last_target_execution_time(&self, schedule_id: Uuid) -> Option<SystemTime> {
783 self.schedulable_jobs
784 .read()
785 .values()
786 .filter(|job| job.schedule_id == Some(schedule_id))
787 .map(|job| job.target_execution_time)
788 .max()
789 .max(
790 self.unschedulable_jobs
791 .read()
792 .values()
793 .filter(|job| job.schedule_id == Some(schedule_id))
794 .map(|job| job.target_execution_time)
795 .max(),
796 )
797 }
798}
799
800#[derive(Debug, Clone)]
801struct Job {
802 id: Uuid,
803 schedule_id: Option<Uuid>,
804 created_at: SystemTime,
805 job_type_id: String,
806 target_execution_time: SystemTime,
807 retry_policy: JobRetryPolicy,
808 timeout_policy: JobTimeoutPolicy,
809 labels: IndexMap<String, String>,
810 marked_unschedulable_at: Option<SystemTime>,
811 cancelled_at: Option<SystemTime>,
812 input_payload_json: String,
813 metadata_json: Option<String>,
814}
815
816impl From<NewJob> for Job {
817 fn from(job: NewJob) -> Self {
818 Self {
819 id: job.id,
820 schedule_id: job.schedule_id,
821 created_at: job.created_at,
822 job_type_id: job.job_type_id,
823 target_execution_time: job.target_execution_time,
824 retry_policy: job.retry_policy,
825 timeout_policy: job.timeout_policy,
826 labels: job.labels,
827 input_payload_json: job.input_payload_json,
828 marked_unschedulable_at: None,
829 cancelled_at: None,
830 metadata_json: job.metadata_json,
831 }
832 }
833}
834
835#[derive(Debug, Clone)]
836struct Execution {
837 id: Uuid,
838 job_id: Uuid,
839 target_execution_time: SystemTime,
840 executor_id: Option<Uuid>,
841 created_at: SystemTime,
842 ready_at: Option<SystemTime>,
843 assigned_at: Option<SystemTime>,
844 started_at: Option<SystemTime>,
845 succeeded_at: Option<SystemTime>,
846 failed_at: Option<SystemTime>,
847 output_payload_json: Option<String>,
848 failure_reason: Option<String>,
849}
850
851impl From<&Execution> for ExecutionDetails {
852 fn from(value: &Execution) -> Self {
853 Self {
854 id: value.id,
855 job_id: value.job_id,
856 executor_id: value.executor_id,
857 status: if value.succeeded_at.is_some() {
858 JobExecutionStatus::Succeeded
859 } else if value.failed_at.is_some() {
860 JobExecutionStatus::Failed
861 } else if value.started_at.is_some() {
862 JobExecutionStatus::Running
863 } else if value.assigned_at.is_some() {
864 JobExecutionStatus::Assigned
865 } else if value.ready_at.is_some() {
866 JobExecutionStatus::Ready
867 } else {
868 JobExecutionStatus::Pending
869 },
870 created_at: value.created_at,
871 ready_at: value.ready_at,
872 assigned_at: value.assigned_at,
873 started_at: value.started_at,
874 succeeded_at: value.succeeded_at,
875 failed_at: value.failed_at,
876 output_payload_json: value.output_payload_json.clone(),
877 failure_reason: value.failure_reason.clone(),
878 }
879 }
880}
881
882#[derive(Debug, Clone)]
883struct Schedule {
884 id: Uuid,
885 created_at: SystemTime,
886 job_type_id: Option<String>,
887 labels: IndexMap<String, String>,
888 marked_unschedulable_at: Option<SystemTime>,
889 cancelled_at: Option<SystemTime>,
890 job_timing_policy: ScheduleJobTimingPolicy,
891 job_creation_policy: ScheduleJobCreationPolicy,
892 time_range: Option<ScheduleTimeRange>,
893 metadata_json: Option<String>,
894}
895
896impl From<NewSchedule> for Schedule {
897 fn from(schedule: NewSchedule) -> Self {
898 Self {
899 id: schedule.id,
900 created_at: schedule.created_at,
901 job_type_id: match &schedule.job_creation_policy {
902 ScheduleJobCreationPolicy::JobDefinition(schedule_new_job_definition) => {
903 Some(schedule_new_job_definition.job_type_id.clone())
904 }
905 },
906 labels: schedule.labels,
907 marked_unschedulable_at: None,
908 cancelled_at: None,
909 job_timing_policy: schedule.job_timing_policy,
910 job_creation_policy: schedule.job_creation_policy,
911 time_range: schedule.time_range,
912 metadata_json: schedule.metadata_json,
913 }
914 }
915}