1use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6 common::{
7 self,
8 v1::{
9 self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10 JobDefinition, TimeRange,
11 },
12 },
13 server::{self, v1::Job},
14 snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30 async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32 async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34 async fn job_added_conditionally(
39 &self,
40 job: NewJob,
41 filters: JobQueryFilters,
42 ) -> eyre::Result<ConditionalJobResult>;
43 async fn jobs_cancelled(
50 &self,
51 job_ids: &[Uuid],
52 timestamp: SystemTime,
53 ) -> eyre::Result<Vec<CancelledJob>>;
54 async fn executions_added(
56 &self,
57 executions: Vec<NewExecution>,
58 timestamp: SystemTime,
59 ) -> eyre::Result<()>;
60 async fn executions_ready(
62 &self,
63 execution_ids: &[Uuid],
64 timestamp: SystemTime,
65 ) -> eyre::Result<()>;
66 async fn execution_assigned(
68 &self,
69 execution_id: Uuid,
70 executor_id: Uuid,
71 timestamp: SystemTime,
72 ) -> eyre::Result<()>;
73 async fn execution_started(
75 &self,
76 execution_id: Uuid,
77 timestamp: SystemTime,
78 ) -> eyre::Result<()>;
79 async fn execution_succeeded(
81 &self,
82 execution_id: Uuid,
83 timestamp: SystemTime,
84 output_payload_json: String,
85 ) -> eyre::Result<()>;
86 async fn executions_failed(
90 &self,
91 execution_ids: &[Uuid],
92 timestamp: SystemTime,
93 reason: String,
94 mark_job_unschedulable: bool,
95 ) -> eyre::Result<()>;
96
97 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
100
101 async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
106 -> eyre::Result<()>;
107 async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
120 async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
133 async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
146
147 async fn query_jobs(
151 &self,
152 cursor: Option<String>,
153 limit: usize,
154 order: JobQueryOrder,
155 filters: JobQueryFilters,
156 ) -> eyre::Result<JobQueryResult>;
157
158 async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
160
161 async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
163
164 async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
166
167 async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
171
172 async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
174
175 async fn schedule_added_conditionally(
180 &self,
181 schedule: NewSchedule,
182 filters: ScheduleQueryFilters,
183 ) -> eyre::Result<ConditionalScheduleResult>;
184
185 async fn schedules_cancelled(
192 &self,
193 schedule_ids: &[Uuid],
194 timestamp: SystemTime,
195 ) -> eyre::Result<Vec<CancelledSchedule>>;
196
197 async fn schedules_unschedulable(
199 &self,
200 schedule_ids: &[Uuid],
201 timestamp: SystemTime,
202 ) -> eyre::Result<()>;
203
204 async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
220
221 async fn query_schedules(
225 &self,
226 cursor: Option<String>,
227 limit: usize,
228 filters: ScheduleQueryFilters,
229 order: ScheduleQueryOrder,
230 ) -> eyre::Result<ScheduleQueryResult>;
231
232 async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
234
235 async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
237
238 async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
242}
243
244#[async_trait]
247pub trait StorageSnapshot {
248 fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
250
251 async fn import_snapshot(
257 &self,
258 snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
259 ) -> eyre::Result<()>;
260}
261
262#[derive(Debug, Clone)]
264pub struct NewJob {
265 pub id: Uuid,
267 pub schedule_id: Option<Uuid>,
269 pub created_at: SystemTime,
271 pub job_type_id: String,
273 pub target_execution_time: SystemTime,
275 pub input_payload_json: String,
277 pub timeout_policy: JobTimeoutPolicy,
279 pub retry_policy: JobRetryPolicy,
281 pub labels: IndexMap<String, String>,
283 pub metadata_json: Option<String>,
285}
286
287pub enum ConditionalJobResult {
289 Added,
291 AlreadyExists {
293 job_id: Uuid,
295 },
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct PendingJob {
301 pub id: Uuid,
303 pub target_execution_time: SystemTime,
305 pub execution_count: u64,
307 pub retry_policy: JobRetryPolicy,
309 pub timeout_policy: JobTimeoutPolicy,
311}
312
313#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
315pub struct JobTimeoutPolicy {
316 pub timeout: Option<Duration>,
318 pub base_time: JobTimeoutBaseTime,
322}
323
324impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
325 fn from(proto: v1::JobTimeoutPolicy) -> Self {
326 Self {
327 timeout: proto.timeout.and_then(|d| d.try_into().ok()),
328 base_time: proto.base_time().into(),
329 }
330 }
331}
332
333impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
334 fn from(policy: JobTimeoutPolicy) -> Self {
335 Self {
336 timeout: policy.timeout.and_then(|d| d.try_into().ok()),
337 base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
338 }
339 }
340}
341
342impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
343 fn from(proto: v1::JobTimeoutBaseTime) -> Self {
344 match proto {
345 v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
346 v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
347 Self::StartTime
348 }
349 }
350 }
351}
352
353impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
354 fn from(base_time: JobTimeoutBaseTime) -> Self {
355 match base_time {
356 JobTimeoutBaseTime::StartTime => Self::StartTime,
357 JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
358 }
359 }
360}
361
362#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
364pub enum JobTimeoutBaseTime {
365 #[default]
367 StartTime,
368 TargetExecutionTime,
376}
377
378#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
380pub struct JobRetryPolicy {
381 pub retries: u64,
385}
386
387impl From<v1::JobRetryPolicy> for JobRetryPolicy {
388 fn from(proto: v1::JobRetryPolicy) -> Self {
389 Self {
390 retries: proto.retries,
391 }
392 }
393}
394
395impl From<JobRetryPolicy> for v1::JobRetryPolicy {
396 fn from(policy: JobRetryPolicy) -> Self {
397 Self {
398 retries: policy.retries,
399 }
400 }
401}
402
403#[derive(Debug, Clone)]
405pub struct NewExecution {
406 pub id: Uuid,
408 pub job_id: Uuid,
410 pub attempt_number: u64,
412 pub target_execution_time: SystemTime,
414}
415
416#[derive(Debug, Clone, PartialEq, Eq)]
418pub struct PendingExecution {
419 pub id: Uuid,
421 pub target_execution_time: SystemTime,
423}
424
425#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct ReadyExecution {
428 pub id: Uuid,
430 pub job_id: Uuid,
432 pub input_payload_json: String,
434 pub attempt_number: u64,
436 pub job_type_id: String,
438 pub target_execution_time: SystemTime,
440 pub timeout_policy: JobTimeoutPolicy,
442}
443
444#[derive(Debug, Clone, PartialEq, Eq)]
446pub struct JobType {
447 pub id: String,
449 pub name: String,
451 pub description: String,
453 pub input_schema_json: Option<String>,
455 pub output_schema_json: Option<String>,
457}
458
459#[derive(Debug, Default, Clone, Serialize, Deserialize)]
461pub struct JobQueryFilters {
462 pub job_ids: Option<IndexSet<Uuid>>,
464 pub job_type_ids: Option<IndexSet<String>>,
466 pub execution_ids: Option<IndexSet<Uuid>>,
468 pub schedule_ids: Option<IndexSet<Uuid>>,
470 pub execution_status: Option<IndexSet<JobExecutionStatus>>,
472 pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
474 pub active: Option<bool>,
476 pub created_after: Option<SystemTime>,
478 pub created_before: Option<SystemTime>,
480 pub target_execution_time_after: Option<SystemTime>,
482 pub target_execution_time_before: Option<SystemTime>,
484}
485
486#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
488pub enum JobQueryOrder {
489 CreatedAtAsc,
491 CreatedAtDesc,
493 TargetExecutionTimeAsc,
495 TargetExecutionTimeDesc,
497}
498
499#[derive(Debug, Clone)]
501pub struct JobQueryResult {
502 pub jobs: Vec<JobDetails>,
504 pub cursor: Option<String>,
506 pub has_more: bool,
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
512pub enum JobExecutionStatus {
513 Pending,
515 Ready,
517 Assigned,
519 Running,
521 Succeeded,
523 Failed,
525}
526
527#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
529pub enum JobLabelFilterValue {
530 Exists,
532 Equals(String),
534}
535
536#[derive(Debug, Clone, PartialEq, Eq)]
538pub struct JobDetails {
539 pub active: bool,
541 pub cancelled: bool,
543 pub id: Uuid,
545 pub job_type_id: String,
547 pub schedule_id: Option<Uuid>,
549 pub target_execution_time: SystemTime,
553 pub input_payload_json: String,
555 pub labels: IndexMap<String, String>,
557 pub timeout_policy: JobTimeoutPolicy,
559 pub retry_policy: JobRetryPolicy,
561 pub created_at: SystemTime,
563 pub executions: Vec<ExecutionDetails>,
565 pub metadata_json: Option<String>,
567}
568
569#[derive(Debug, Clone, PartialEq, Eq)]
572pub struct ExecutionDetails {
573 pub id: Uuid,
575 pub job_id: Uuid,
577 pub executor_id: Option<Uuid>,
579 pub status: JobExecutionStatus,
581 pub created_at: SystemTime,
583 pub ready_at: Option<SystemTime>,
585 pub assigned_at: Option<SystemTime>,
587 pub started_at: Option<SystemTime>,
589 pub succeeded_at: Option<SystemTime>,
591 pub failed_at: Option<SystemTime>,
593 pub output_payload_json: Option<String>,
595 pub failure_reason: Option<String>,
597}
598
599#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct CancelledJob {
602 pub id: Uuid,
604 pub active_execution: Option<Uuid>,
606}
607
608#[derive(Debug, Clone)]
610pub struct NewSchedule {
611 pub id: Uuid,
613 pub created_at: SystemTime,
615 pub job_timing_policy: ScheduleJobTimingPolicy,
617 pub job_creation_policy: ScheduleJobCreationPolicy,
619 pub labels: IndexMap<String, String>,
621 pub time_range: Option<ScheduleTimeRange>,
623 pub metadata_json: Option<String>,
625}
626
627pub enum ConditionalScheduleResult {
629 Added,
631 AlreadyExists {
633 schedule_id: Uuid,
635 },
636}
637
638#[derive(Debug, Clone, PartialEq, Eq)]
640pub enum ScheduleJobTimingPolicy {
641 Repeat(SchedulingPolicyRepeat),
643 Cron(SchedulingPolicyCron),
645}
646
647#[derive(Debug, Clone, PartialEq, Eq)]
649pub struct SchedulingPolicyRepeat {
650 pub interval: Duration,
652 pub immediate: bool,
654 pub missed_policy: ScheduleMissedTimePolicy,
656}
657
658#[derive(Debug, Clone, PartialEq, Eq)]
660pub struct SchedulingPolicyCron {
661 pub cron_expression: String,
663 pub immediate: bool,
665 pub missed_policy: ScheduleMissedTimePolicy,
667}
668
669#[derive(Debug, Clone, PartialEq, Eq)]
671pub enum ScheduleMissedTimePolicy {
672 Skip,
674 Create,
676}
677
678#[derive(Debug, Clone, PartialEq, Eq)]
680pub enum ScheduleJobCreationPolicy {
681 JobDefinition(ScheduleNewJobDefinition),
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
687pub struct ScheduleNewJobDefinition {
688 pub job_type_id: String,
690 pub input_payload_json: String,
692 pub timeout_policy: JobTimeoutPolicy,
694 pub retry_policy: JobRetryPolicy,
696 pub labels: IndexMap<String, String>,
698}
699
700#[derive(Debug, Clone, PartialEq, Eq)]
702pub struct CancelledSchedule {
703 pub id: Uuid,
705}
706
707#[derive(Debug, Clone, PartialEq, Eq)]
709pub struct PendingSchedule {
710 pub id: Uuid,
712 pub job_timing_policy: ScheduleJobTimingPolicy,
714 pub job_creation_policy: ScheduleJobCreationPolicy,
716 pub last_target_execution_time: Option<SystemTime>,
719 pub time_range: Option<ScheduleTimeRange>,
721}
722
723#[derive(Debug, Default, Clone, Serialize, Deserialize)]
725pub struct ScheduleQueryFilters {
726 pub schedule_ids: Option<IndexSet<Uuid>>,
728 pub job_ids: Option<IndexSet<Uuid>>,
730 pub job_type_ids: Option<IndexSet<String>>,
732 pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
734 pub active: Option<bool>,
736 pub created_after: Option<SystemTime>,
738 pub created_before: Option<SystemTime>,
740}
741
742#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
744pub enum ScheduleQueryOrder {
745 CreatedAtAsc,
747 CreatedAtDesc,
749}
750
751#[derive(Debug, Clone)]
753pub struct ScheduleQueryResult {
754 pub schedules: Vec<ScheduleDetails>,
756 pub cursor: Option<String>,
758 pub has_more: bool,
760}
761
762#[derive(Debug, Clone, PartialEq, Eq)]
768pub struct ScheduleDetails {
769 pub id: Uuid,
771 pub created_at: SystemTime,
773 pub job_timing_policy: ScheduleJobTimingPolicy,
775 pub job_creation_policy: ScheduleJobCreationPolicy,
777 pub labels: IndexMap<String, String>,
779 pub active: bool,
781 pub cancelled: bool,
783 pub time_range: Option<ScheduleTimeRange>,
785 pub metadata_json: Option<String>,
787}
788
789#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
791pub enum ScheduleLabelFilterValue {
792 Exists,
794 Equals(String),
796}
797
798#[derive(Debug, Clone, Copy, PartialEq, Eq)]
800pub struct ScheduleTimeRange {
801 pub start: Option<SystemTime>,
803 pub end: Option<SystemTime>,
805}
806
807impl ScheduleTimeRange {
808 #[must_use]
810 pub fn contains(&self, time: SystemTime) -> bool {
811 if let Some(start) = self.start {
812 if time < start {
813 return false;
814 }
815 }
816 if let Some(end) = self.end {
817 if time >= end {
818 return false;
819 }
820 }
821 true
822 }
823
824 #[must_use]
826 pub fn is_valid(&self) -> bool {
827 if let (Some(start), Some(end)) = (self.start, self.end) {
828 start < end
829 } else {
830 true
831 }
832 }
833}
834
835impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
836 fn from(value: ScheduleJobTimingPolicy) -> Self {
837 match value {
838 ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
839 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
840 common::v1::ScheduleJobTimingPolicyRepeat {
841 interval: Some(policy.interval.try_into().unwrap()),
842 immediate: policy.immediate,
843 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
844 policy.missed_policy,
845 )
846 .into(),
847 },
848 )),
849 },
850 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
851 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
852 common::v1::ScheduleJobTimingPolicyCron {
853 cron_expression: policy.cron_expression,
854 immediate: policy.immediate,
855 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
856 policy.missed_policy,
857 )
858 .into(),
859 },
860 )),
861 },
862 }
863 }
864}
865
866impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
867 type Error = Status;
868
869 fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
870 let value = value.job_timing.ok_or_else(|| {
871 Status::invalid_argument("missing job timing policy in schedule definition")
872 })?;
873
874 match value {
875 schedule_job_timing_policy::JobTiming::Repeat(policy) => {
876 Ok(Self::Repeat(SchedulingPolicyRepeat {
877 interval: policy
878 .interval
879 .ok_or_else(|| {
880 Status::invalid_argument("missing interval in repeat policy")
881 })?
882 .try_into()
883 .map_err(|_| Status::invalid_argument("invalid interval"))?,
884 immediate: policy.immediate,
885 missed_policy: policy.missed_time_policy().into(),
886 }))
887 }
888 schedule_job_timing_policy::JobTiming::Cron(policy) => {
889 Ok(Self::Cron(SchedulingPolicyCron {
890 missed_policy: policy.missed_time_policy().into(),
891 cron_expression: {
892 let mut parse_options = cronexpr::ParseOptions::default();
893 parse_options.fallback_timezone_option =
894 cronexpr::FallbackTimezoneOption::UTC;
895
896 cronexpr::parse_crontab_with(&policy.cron_expression, parse_options)
898 .map_err(|err| {
899 Status::invalid_argument(format!("invalid cron expression: {err}"))
900 })?;
901
902 policy.cron_expression
903 },
904 immediate: policy.immediate,
905 }))
906 }
907 }
908 }
909}
910
911impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
912 fn from(value: ScheduleJobCreationPolicy) -> Self {
913 match value {
914 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
915 common::v1::ScheduleJobCreationPolicy {
916 job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
917 job_type_id: job_definition.job_type_id,
918 input_payload_json: job_definition.input_payload_json,
919 target_execution_time: None,
920 retry_policy: Some(common::v1::JobRetryPolicy {
921 retries: job_definition.retry_policy.retries,
922 }),
923 timeout_policy: Some(common::v1::JobTimeoutPolicy {
924 timeout: job_definition
925 .timeout_policy
926 .timeout
927 .and_then(|d| d.try_into().ok()),
928 base_time: match job_definition.timeout_policy.base_time {
929 JobTimeoutBaseTime::StartTime => {
930 common::v1::JobTimeoutBaseTime::StartTime.into()
931 }
932 JobTimeoutBaseTime::TargetExecutionTime => {
933 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
934 }
935 },
936 }),
937 labels: job_definition
938 .labels
939 .into_iter()
940 .map(|(key, value)| common::v1::JobLabel { key, value })
941 .collect(),
942 metadata_json: None,
943 })),
944 }
945 }
946 }
947 }
948}
949
950impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
951 type Error = Status;
952
953 fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
954 let value = value.job_creation.ok_or_else(|| {
955 Status::invalid_argument("missing job creation policy in schedule definition")
956 })?;
957
958 match value {
959 JobCreation::JobDefinition(job_definition) => {
960 Ok(Self::JobDefinition(ScheduleNewJobDefinition {
961 job_type_id: job_definition.job_type_id,
962 input_payload_json: job_definition.input_payload_json,
963 timeout_policy: job_definition
964 .timeout_policy
965 .map(Into::into)
966 .unwrap_or_default(),
967 retry_policy: job_definition
968 .retry_policy
969 .map(Into::into)
970 .unwrap_or_default(),
971 labels: job_definition
972 .labels
973 .into_iter()
974 .map(|label| (label.key, label.value))
975 .collect(),
976 }))
977 }
978 }
979 }
980}
981
982impl From<ScheduleTimeRange> for TimeRange {
983 fn from(value: ScheduleTimeRange) -> Self {
984 Self {
985 start: value.start.map(Into::into),
986 end: value.end.map(Into::into),
987 }
988 }
989}
990
991impl TryFrom<TimeRange> for ScheduleTimeRange {
992 type Error = Status;
993
994 fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
995 Ok(Self {
996 start: value
997 .start
998 .map(SystemTime::try_from)
999 .transpose()
1000 .map_err(|error| {
1001 Status::invalid_argument(format!("unsupported timestamp: {error}"))
1002 })?,
1003 end: value
1004 .end
1005 .map(SystemTime::try_from)
1006 .transpose()
1007 .map_err(|error| {
1008 Status::invalid_argument(format!("unsupported timestamp: {error}"))
1009 })?,
1010 })
1011 }
1012}
1013
1014impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
1015 fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
1016 match value {
1017 common::v1::ScheduleMissedTimePolicy::Skip
1018 | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
1019 common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
1020 }
1021 }
1022}
1023
1024impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
1025 fn from(value: ScheduleMissedTimePolicy) -> Self {
1026 match value {
1027 ScheduleMissedTimePolicy::Skip => Self::Skip,
1028 ScheduleMissedTimePolicy::Create => Self::Create,
1029 }
1030 }
1031}
1032
1033impl From<server::v1::JobQueryOrder> for JobQueryOrder {
1034 fn from(value: server::v1::JobQueryOrder) -> Self {
1035 match value {
1036 server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1037 server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
1038 Self::CreatedAtDesc
1039 }
1040 server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
1041 server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
1042 }
1043 }
1044}
1045
1046impl From<JobDetails> for Job {
1047 fn from(job: JobDetails) -> Self {
1048 Job {
1049 id: job.id.to_string(),
1050 schedule_id: job.schedule_id.map(|t| t.to_string()),
1051 cancelled: job.cancelled,
1052 active: job.active,
1053 definition: Some(JobDefinition {
1054 job_type_id: job.job_type_id,
1055 input_payload_json: job.input_payload_json,
1056 target_execution_time: Some(job.target_execution_time.into()),
1057 retry_policy: Some(common::v1::JobRetryPolicy {
1058 retries: job.retry_policy.retries,
1059 }),
1060 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1061 timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1062 base_time: match job.timeout_policy.base_time {
1063 JobTimeoutBaseTime::StartTime => {
1064 common::v1::JobTimeoutBaseTime::StartTime.into()
1065 }
1066 JobTimeoutBaseTime::TargetExecutionTime => {
1067 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1068 }
1069 },
1070 }),
1071 labels: job
1072 .labels
1073 .into_iter()
1074 .map(|(key, value)| common::v1::JobLabel { key, value })
1075 .collect(),
1076 metadata_json: job.metadata_json,
1077 }),
1078 created_at: Some(job.created_at.into()),
1079 executions: job.executions.into_iter().map(Into::into).collect(),
1080 }
1081 }
1082}
1083
1084impl From<ExecutionDetails> for server::v1::JobExecution {
1085 fn from(value: ExecutionDetails) -> Self {
1086 Self {
1087 id: value.id.to_string(),
1088 job_id: value.job_id.to_string(),
1089 executor_id: value.executor_id.map(|t| t.to_string()),
1090 status: match value.status {
1091 JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1092 JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1093 JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1094 JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1095 JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1096 JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1097 }
1098 .into(),
1099 created_at: Some(value.created_at.into()),
1100 ready_at: value.ready_at.map(Into::into),
1101 assigned_at: value.assigned_at.map(Into::into),
1102 started_at: value.started_at.map(Into::into),
1103 succeeded_at: value.succeeded_at.map(Into::into),
1104 failed_at: value.failed_at.map(Into::into),
1105 output_payload_json: value.output_payload_json,
1106 failure_reason: value.failure_reason,
1107 }
1108 }
1109}
1110
1111impl From<JobType> for ora_proto::common::v1::JobType {
1112 fn from(value: JobType) -> Self {
1113 Self {
1114 id: value.id,
1115 name: if value.name.is_empty() {
1116 None
1117 } else {
1118 Some(value.name)
1119 },
1120 description: if value.description.is_empty() {
1121 None
1122 } else {
1123 Some(value.description)
1124 },
1125 input_schema_json: value.input_schema_json,
1126 output_schema_json: value.output_schema_json,
1127 }
1128 }
1129}
1130
1131impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1132 type Error = Status;
1133
1134 fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1135 Ok(Self {
1136 execution_status: {
1137 let status = filter
1138 .status()
1139 .filter_map(|status| match status {
1140 server::v1::JobExecutionStatus::Unspecified => None,
1141 server::v1::JobExecutionStatus::Pending => {
1142 Some(JobExecutionStatus::Pending)
1143 }
1144 server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1145 server::v1::JobExecutionStatus::Assigned => {
1146 Some(JobExecutionStatus::Assigned)
1147 }
1148 server::v1::JobExecutionStatus::Running => {
1149 Some(JobExecutionStatus::Running)
1150 }
1151 server::v1::JobExecutionStatus::Succeeded => {
1152 Some(JobExecutionStatus::Succeeded)
1153 }
1154 server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1155 })
1156 .collect::<IndexSet<_>>();
1157
1158 if status.is_empty() {
1159 None
1160 } else {
1161 Some(status)
1162 }
1163 },
1164 job_ids: if filter.job_ids.is_empty() {
1165 None
1166 } else {
1167 Some(
1168 filter
1169 .job_ids
1170 .iter()
1171 .map(|f| {
1172 f.parse().map_err(|err| {
1173 Status::invalid_argument(format!("invalid job ID: {err}"))
1174 })
1175 })
1176 .collect::<Result<_, _>>()?,
1177 )
1178 },
1179 job_type_ids: if filter.job_type_ids.is_empty() {
1180 None
1181 } else {
1182 Some(filter.job_type_ids.into_iter().collect())
1183 },
1184 execution_ids: if filter.execution_ids.is_empty() {
1185 None
1186 } else {
1187 Some(
1188 filter
1189 .execution_ids
1190 .iter()
1191 .map(|f| {
1192 f.parse().map_err(|err| {
1193 Status::invalid_argument(format!("invalid execution ID: {err}"))
1194 })
1195 })
1196 .collect::<Result<_, _>>()?,
1197 )
1198 },
1199 schedule_ids: if filter.schedule_ids.is_empty() {
1200 None
1201 } else {
1202 Some(
1203 filter
1204 .schedule_ids
1205 .iter()
1206 .map(|f| {
1207 f.parse().map_err(|err| {
1208 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1209 })
1210 })
1211 .collect::<Result<_, _>>()?,
1212 )
1213 },
1214 labels: if filter.labels.is_empty() {
1215 None
1216 } else {
1217 Some(
1218 filter
1219 .labels
1220 .into_iter()
1221 .filter_map(|label| {
1222 Some((
1223 label.key,
1224 match label.value? {
1225 server::v1::job_label_filter::Value::Exists(_) => {
1226 JobLabelFilterValue::Exists
1227 }
1228 server::v1::job_label_filter::Value::Equals(value) => {
1229 JobLabelFilterValue::Equals(value)
1230 }
1231 },
1232 ))
1233 })
1234 .collect(),
1235 )
1236 },
1237 active: filter.active,
1238 created_after: filter
1239 .created_at
1240 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1241 created_before: filter
1242 .created_at
1243 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1244 target_execution_time_after: filter
1245 .target_execution_time
1246 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1247 target_execution_time_before: filter
1248 .target_execution_time
1249 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1250 })
1251 }
1252}
1253
1254impl From<ScheduleDetails> for server::v1::Schedule {
1255 fn from(value: ScheduleDetails) -> Self {
1256 Self {
1257 id: value.id.to_string(),
1258 definition: Some(common::v1::ScheduleDefinition {
1259 job_timing_policy: Some(match value.job_timing_policy {
1260 ScheduleJobTimingPolicy::Repeat(policy) => {
1261 common::v1::ScheduleJobTimingPolicy {
1262 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1263 common::v1::ScheduleJobTimingPolicyRepeat {
1264 interval: policy.interval.try_into().ok(),
1265 immediate: policy.immediate,
1266 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1267 policy.missed_policy,
1268 )
1269 .into(),
1270 },
1271 )),
1272 }
1273 }
1274 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1275 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1276 common::v1::ScheduleJobTimingPolicyCron {
1277 cron_expression: policy.cron_expression,
1278 immediate: policy.immediate,
1279 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1280 policy.missed_policy,
1281 )
1282 .into(),
1283 },
1284 )),
1285 },
1286 }),
1287 job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1288 job_creation: Some(match value.job_creation_policy {
1289 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1290 JobCreation::JobDefinition(common::v1::JobDefinition {
1291 job_type_id: job_definition.job_type_id,
1292 input_payload_json: job_definition.input_payload_json,
1293 target_execution_time: None,
1294 retry_policy: Some(common::v1::JobRetryPolicy {
1295 retries: job_definition.retry_policy.retries,
1296 }),
1297 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1298 timeout: job_definition
1299 .timeout_policy
1300 .timeout
1301 .and_then(|d| d.try_into().ok()),
1302 base_time: match job_definition.timeout_policy.base_time {
1303 JobTimeoutBaseTime::StartTime => {
1304 common::v1::JobTimeoutBaseTime::StartTime.into()
1305 }
1306 JobTimeoutBaseTime::TargetExecutionTime => {
1307 common::v1::JobTimeoutBaseTime::TargetExecutionTime
1308 .into()
1309 }
1310 },
1311 }),
1312 labels: job_definition
1313 .labels
1314 .into_iter()
1315 .map(|(key, value)| common::v1::JobLabel { key, value })
1316 .collect(),
1317 metadata_json: None,
1318 })
1319 }
1320 }),
1321 }),
1322 time_range: value.time_range.map(|range| TimeRange {
1323 start: range.start.map(Into::into),
1324 end: range.end.map(Into::into),
1325 }),
1326 labels: value
1327 .labels
1328 .into_iter()
1329 .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1330 .collect(),
1331 metadata_json: value.metadata_json,
1332 }),
1333 created_at: Some(value.created_at.into()),
1334 active: value.active,
1335 cancelled: value.cancelled,
1336 }
1337 }
1338}
1339
1340impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1341 fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1342 match value {
1343 server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1344 server::v1::ScheduleQueryOrder::CreatedAtDesc
1345 | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1346 }
1347 }
1348}
1349
1350impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1351 type Error = Status;
1352
1353 fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1354 Ok(Self {
1355 schedule_ids: if value.schedule_ids.is_empty() {
1356 None
1357 } else {
1358 Some(
1359 value
1360 .schedule_ids
1361 .iter()
1362 .map(|f| {
1363 f.parse().map_err(|err| {
1364 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1365 })
1366 })
1367 .collect::<Result<_, _>>()?,
1368 )
1369 },
1370 job_ids: if value.job_ids.is_empty() {
1371 None
1372 } else {
1373 Some(
1374 value
1375 .job_ids
1376 .iter()
1377 .map(|f| {
1378 f.parse().map_err(|err| {
1379 Status::invalid_argument(format!("invalid job ID: {err}"))
1380 })
1381 })
1382 .collect::<Result<_, _>>()?,
1383 )
1384 },
1385 job_type_ids: if value.job_type_ids.is_empty() {
1386 None
1387 } else {
1388 Some(value.job_type_ids.into_iter().collect())
1389 },
1390 labels: if value.labels.is_empty() {
1391 None
1392 } else {
1393 Some(
1394 value
1395 .labels
1396 .into_iter()
1397 .filter_map(|label| {
1398 Some((
1399 label.key,
1400 match label.value? {
1401 server::v1::schedule_label_filter::Value::Exists(_) => {
1402 ScheduleLabelFilterValue::Exists
1403 }
1404 server::v1::schedule_label_filter::Value::Equals(value) => {
1405 ScheduleLabelFilterValue::Equals(value)
1406 }
1407 },
1408 ))
1409 })
1410 .collect(),
1411 )
1412 },
1413 active: value.active,
1414 created_after: value
1415 .created_at
1416 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1417 created_before: value
1418 .created_at
1419 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1420 })
1421 }
1422}
1423
1424impl From<JobType> for snapshot::v1::ExportedJobType {
1425 fn from(job_type: JobType) -> Self {
1426 snapshot::v1::ExportedJobType {
1427 job_type: Some(common::v1::JobType {
1428 id: job_type.id,
1429 name: Some(job_type.name),
1430 description: Some(job_type.description),
1431 input_schema_json: job_type.input_schema_json,
1432 output_schema_json: job_type.output_schema_json,
1433 }),
1434 }
1435 }
1436}
1437
1438impl From<snapshot::v1::ExportedJobType> for JobType {
1439 fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1440 let job_type = job_type.job_type.unwrap();
1441
1442 Self {
1443 id: job_type.id,
1444 name: job_type.name.unwrap_or_default(),
1445 description: job_type.description.unwrap_or_default(),
1446 input_schema_json: job_type.input_schema_json,
1447 output_schema_json: job_type.output_schema_json,
1448 }
1449 }
1450}