1use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6 common::{
7 self,
8 v1::{
9 self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10 JobDefinition, TimeRange,
11 },
12 },
13 server::{self, v1::Job},
14 snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30 async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32 async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34 async fn jobs_cancelled(
41 &self,
42 job_ids: &[Uuid],
43 timestamp: SystemTime,
44 ) -> eyre::Result<Vec<CancelledJob>>;
45 async fn executions_added(
47 &self,
48 executions: Vec<NewExecution>,
49 timestamp: SystemTime,
50 ) -> eyre::Result<()>;
51 async fn executions_ready(
53 &self,
54 execution_ids: &[Uuid],
55 timestamp: SystemTime,
56 ) -> eyre::Result<()>;
57 async fn execution_assigned(
59 &self,
60 execution_id: Uuid,
61 executor_id: Uuid,
62 timestamp: SystemTime,
63 ) -> eyre::Result<()>;
64 async fn execution_started(
66 &self,
67 execution_id: Uuid,
68 timestamp: SystemTime,
69 ) -> eyre::Result<()>;
70 async fn execution_succeeded(
72 &self,
73 execution_id: Uuid,
74 timestamp: SystemTime,
75 output_payload_json: String,
76 ) -> eyre::Result<()>;
77 async fn executions_failed(
81 &self,
82 execution_ids: &[Uuid],
83 timestamp: SystemTime,
84 reason: String,
85 mark_job_unschedulable: bool,
86 ) -> eyre::Result<()>;
87
88 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
91
92 async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
97 -> eyre::Result<()>;
98 async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
111 async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
124 async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
137
138 async fn query_jobs(
142 &self,
143 cursor: Option<String>,
144 limit: usize,
145 order: JobQueryOrder,
146 filters: JobQueryFilters,
147 ) -> eyre::Result<JobQueryResult>;
148
149 async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
151
152 async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
154
155 async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
157
158 async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
162
163 async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
165
166 async fn schedules_cancelled(
173 &self,
174 schedule_ids: &[Uuid],
175 timestamp: SystemTime,
176 ) -> eyre::Result<Vec<CancelledSchedule>>;
177
178 async fn schedules_unschedulable(
180 &self,
181 schedule_ids: &[Uuid],
182 timestamp: SystemTime,
183 ) -> eyre::Result<()>;
184
185 async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
201
202 async fn query_schedules(
206 &self,
207 cursor: Option<String>,
208 limit: usize,
209 filters: ScheduleQueryFilters,
210 order: ScheduleQueryOrder,
211 ) -> eyre::Result<ScheduleQueryResult>;
212
213 async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
215
216 async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
218
219 async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
223}
224
225#[async_trait]
228pub trait StorageSnapshot {
229 fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
231
232 async fn import_snapshot(
238 &self,
239 snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
240 ) -> eyre::Result<()>;
241}
242
243#[derive(Debug, Clone)]
245pub struct NewJob {
246 pub id: Uuid,
248 pub schedule_id: Option<Uuid>,
250 pub created_at: SystemTime,
252 pub job_type_id: String,
254 pub target_execution_time: SystemTime,
256 pub input_payload_json: String,
258 pub timeout_policy: JobTimeoutPolicy,
260 pub retry_policy: JobRetryPolicy,
262 pub labels: IndexMap<String, String>,
264 pub metadata_json: Option<String>,
266}
267
268#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct PendingJob {
271 pub id: Uuid,
273 pub target_execution_time: SystemTime,
275 pub execution_count: u64,
277 pub retry_policy: JobRetryPolicy,
279 pub timeout_policy: JobTimeoutPolicy,
281}
282
283#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
285pub struct JobTimeoutPolicy {
286 pub timeout: Option<Duration>,
288 pub base_time: JobTimeoutBaseTime,
292}
293
294impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
295 fn from(proto: v1::JobTimeoutPolicy) -> Self {
296 Self {
297 timeout: proto.timeout.and_then(|d| d.try_into().ok()),
298 base_time: proto.base_time().into(),
299 }
300 }
301}
302
303impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
304 fn from(policy: JobTimeoutPolicy) -> Self {
305 Self {
306 timeout: policy.timeout.and_then(|d| d.try_into().ok()),
307 base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
308 }
309 }
310}
311
312impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
313 fn from(proto: v1::JobTimeoutBaseTime) -> Self {
314 match proto {
315 v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
316 v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
317 Self::StartTime
318 }
319 }
320 }
321}
322
323impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
324 fn from(base_time: JobTimeoutBaseTime) -> Self {
325 match base_time {
326 JobTimeoutBaseTime::StartTime => Self::StartTime,
327 JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
328 }
329 }
330}
331
332#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
334pub enum JobTimeoutBaseTime {
335 #[default]
337 StartTime,
338 TargetExecutionTime,
346}
347
348#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
350pub struct JobRetryPolicy {
351 pub retries: u64,
355}
356
357impl From<v1::JobRetryPolicy> for JobRetryPolicy {
358 fn from(proto: v1::JobRetryPolicy) -> Self {
359 Self {
360 retries: proto.retries,
361 }
362 }
363}
364
365impl From<JobRetryPolicy> for v1::JobRetryPolicy {
366 fn from(policy: JobRetryPolicy) -> Self {
367 Self {
368 retries: policy.retries,
369 }
370 }
371}
372
373#[derive(Debug, Clone)]
375pub struct NewExecution {
376 pub id: Uuid,
378 pub job_id: Uuid,
380 pub attempt_number: u64,
382 pub target_execution_time: SystemTime,
384}
385
386#[derive(Debug, Clone, PartialEq, Eq)]
388pub struct PendingExecution {
389 pub id: Uuid,
391 pub target_execution_time: SystemTime,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct ReadyExecution {
398 pub id: Uuid,
400 pub job_id: Uuid,
402 pub input_payload_json: String,
404 pub attempt_number: u64,
406 pub job_type_id: String,
408 pub target_execution_time: SystemTime,
410 pub timeout_policy: JobTimeoutPolicy,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct JobType {
417 pub id: String,
419 pub name: String,
421 pub description: String,
423 pub input_schema_json: Option<String>,
425 pub output_schema_json: Option<String>,
427}
428
429#[derive(Debug, Default, Clone, Serialize, Deserialize)]
431pub struct JobQueryFilters {
432 pub job_ids: Option<IndexSet<Uuid>>,
434 pub job_type_ids: Option<IndexSet<String>>,
436 pub execution_ids: Option<IndexSet<Uuid>>,
438 pub schedule_ids: Option<IndexSet<Uuid>>,
440 pub execution_status: Option<IndexSet<JobExecutionStatus>>,
442 pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
444 pub active: Option<bool>,
446 pub created_after: Option<SystemTime>,
448 pub created_before: Option<SystemTime>,
450 pub target_execution_time_after: Option<SystemTime>,
452 pub target_execution_time_before: Option<SystemTime>,
454}
455
456#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
458pub enum JobQueryOrder {
459 CreatedAtAsc,
461 CreatedAtDesc,
463 TargetExecutionTimeAsc,
465 TargetExecutionTimeDesc,
467}
468
469#[derive(Debug, Clone)]
471pub struct JobQueryResult {
472 pub jobs: Vec<JobDetails>,
474 pub cursor: Option<String>,
476 pub has_more: bool,
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
482pub enum JobExecutionStatus {
483 Pending,
485 Ready,
487 Assigned,
489 Running,
491 Succeeded,
493 Failed,
495}
496
497#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499pub enum JobLabelFilterValue {
500 Exists,
502 Equals(String),
504}
505
506#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct JobDetails {
509 pub active: bool,
511 pub cancelled: bool,
513 pub id: Uuid,
515 pub job_type_id: String,
517 pub schedule_id: Option<Uuid>,
519 pub target_execution_time: SystemTime,
523 pub input_payload_json: String,
525 pub labels: IndexMap<String, String>,
527 pub timeout_policy: JobTimeoutPolicy,
529 pub retry_policy: JobRetryPolicy,
531 pub created_at: SystemTime,
533 pub executions: Vec<ExecutionDetails>,
535 pub metadata_json: Option<String>,
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
542pub struct ExecutionDetails {
543 pub id: Uuid,
545 pub job_id: Uuid,
547 pub executor_id: Option<Uuid>,
549 pub status: JobExecutionStatus,
551 pub created_at: SystemTime,
553 pub ready_at: Option<SystemTime>,
555 pub assigned_at: Option<SystemTime>,
557 pub started_at: Option<SystemTime>,
559 pub succeeded_at: Option<SystemTime>,
561 pub failed_at: Option<SystemTime>,
563 pub output_payload_json: Option<String>,
565 pub failure_reason: Option<String>,
567}
568
569#[derive(Debug, Clone, PartialEq, Eq)]
571pub struct CancelledJob {
572 pub id: Uuid,
574 pub active_execution: Option<Uuid>,
576}
577
578#[derive(Debug, Clone)]
580pub struct NewSchedule {
581 pub id: Uuid,
583 pub created_at: SystemTime,
585 pub job_timing_policy: ScheduleJobTimingPolicy,
587 pub job_creation_policy: ScheduleJobCreationPolicy,
589 pub labels: IndexMap<String, String>,
591 pub time_range: Option<ScheduleTimeRange>,
593 pub metadata_json: Option<String>,
595}
596
597#[derive(Debug, Clone, PartialEq, Eq)]
599pub enum ScheduleJobTimingPolicy {
600 Repeat(SchedulingPolicyRepeat),
602 Cron(SchedulingPolicyCron),
604}
605
606#[derive(Debug, Clone, PartialEq, Eq)]
608pub struct SchedulingPolicyRepeat {
609 pub interval: Duration,
611 pub immediate: bool,
613 pub missed_policy: ScheduleMissedTimePolicy,
615}
616
617#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct SchedulingPolicyCron {
620 pub cron_expression: String,
622 pub immediate: bool,
624 pub missed_policy: ScheduleMissedTimePolicy,
626}
627
628#[derive(Debug, Clone, PartialEq, Eq)]
630pub enum ScheduleMissedTimePolicy {
631 Skip,
633 Create,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq)]
639pub enum ScheduleJobCreationPolicy {
640 JobDefinition(ScheduleNewJobDefinition),
642}
643
644#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ScheduleNewJobDefinition {
647 pub job_type_id: String,
649 pub input_payload_json: String,
651 pub timeout_policy: JobTimeoutPolicy,
653 pub retry_policy: JobRetryPolicy,
655 pub labels: IndexMap<String, String>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq)]
661pub struct CancelledSchedule {
662 pub id: Uuid,
664}
665
666#[derive(Debug, Clone, PartialEq, Eq)]
668pub struct PendingSchedule {
669 pub id: Uuid,
671 pub job_timing_policy: ScheduleJobTimingPolicy,
673 pub job_creation_policy: ScheduleJobCreationPolicy,
675 pub last_target_execution_time: Option<SystemTime>,
678 pub time_range: Option<ScheduleTimeRange>,
680}
681
682#[derive(Debug, Default, Clone, Serialize, Deserialize)]
684pub struct ScheduleQueryFilters {
685 pub schedule_ids: Option<IndexSet<Uuid>>,
687 pub job_ids: Option<IndexSet<Uuid>>,
689 pub job_type_ids: Option<IndexSet<String>>,
691 pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
693 pub active: Option<bool>,
695 pub created_after: Option<SystemTime>,
697 pub created_before: Option<SystemTime>,
699}
700
701#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
703pub enum ScheduleQueryOrder {
704 CreatedAtAsc,
706 CreatedAtDesc,
708}
709
710#[derive(Debug, Clone)]
712pub struct ScheduleQueryResult {
713 pub schedules: Vec<ScheduleDetails>,
715 pub cursor: Option<String>,
717 pub has_more: bool,
719}
720
721#[derive(Debug, Clone, PartialEq, Eq)]
727pub struct ScheduleDetails {
728 pub id: Uuid,
730 pub created_at: SystemTime,
732 pub job_timing_policy: ScheduleJobTimingPolicy,
734 pub job_creation_policy: ScheduleJobCreationPolicy,
736 pub labels: IndexMap<String, String>,
738 pub active: bool,
740 pub cancelled: bool,
742 pub time_range: Option<ScheduleTimeRange>,
744 pub metadata_json: Option<String>,
746}
747
748#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
750pub enum ScheduleLabelFilterValue {
751 Exists,
753 Equals(String),
755}
756
757#[derive(Debug, Clone, Copy, PartialEq, Eq)]
759pub struct ScheduleTimeRange {
760 pub start: Option<SystemTime>,
762 pub end: Option<SystemTime>,
764}
765
766impl ScheduleTimeRange {
767 #[must_use]
769 pub fn contains(&self, time: SystemTime) -> bool {
770 if let Some(start) = self.start {
771 if time < start {
772 return false;
773 }
774 }
775 if let Some(end) = self.end {
776 if time >= end {
777 return false;
778 }
779 }
780 true
781 }
782
783 #[must_use]
785 pub fn is_valid(&self) -> bool {
786 if let (Some(start), Some(end)) = (self.start, self.end) {
787 start < end
788 } else {
789 true
790 }
791 }
792}
793
794impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
795 fn from(value: ScheduleJobTimingPolicy) -> Self {
796 match value {
797 ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
798 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
799 common::v1::ScheduleJobTimingPolicyRepeat {
800 interval: Some(policy.interval.try_into().unwrap()),
801 immediate: policy.immediate,
802 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
803 policy.missed_policy,
804 )
805 .into(),
806 },
807 )),
808 },
809 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
810 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
811 common::v1::ScheduleJobTimingPolicyCron {
812 cron_expression: policy.cron_expression,
813 immediate: policy.immediate,
814 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
815 policy.missed_policy,
816 )
817 .into(),
818 },
819 )),
820 },
821 }
822 }
823}
824
825impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
826 type Error = Status;
827
828 fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
829 let value = value.job_timing.ok_or_else(|| {
830 Status::invalid_argument("missing job timing policy in schedule definition")
831 })?;
832
833 match value {
834 schedule_job_timing_policy::JobTiming::Repeat(policy) => {
835 Ok(Self::Repeat(SchedulingPolicyRepeat {
836 interval: policy
837 .interval
838 .ok_or_else(|| {
839 Status::invalid_argument("missing interval in repeat policy")
840 })?
841 .try_into()
842 .map_err(|_| Status::invalid_argument("invalid interval"))?,
843 immediate: policy.immediate,
844 missed_policy: policy.missed_time_policy().into(),
845 }))
846 }
847 schedule_job_timing_policy::JobTiming::Cron(policy) => {
848 Ok(Self::Cron(SchedulingPolicyCron {
849 missed_policy: policy.missed_time_policy().into(),
850 cron_expression: {
851 let mut parse_options = cronexpr::ParseOptions::default();
852 parse_options.fallback_timezone_option =
853 cronexpr::FallbackTimezoneOption::UTC;
854
855 cronexpr::parse_crontab_with(&policy.cron_expression, parse_options)
857 .map_err(|err| {
858 Status::invalid_argument(format!("invalid cron expression: {err}"))
859 })?;
860
861 policy.cron_expression
862 },
863 immediate: policy.immediate,
864 }))
865 }
866 }
867 }
868}
869
870impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
871 fn from(value: ScheduleJobCreationPolicy) -> Self {
872 match value {
873 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
874 common::v1::ScheduleJobCreationPolicy {
875 job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
876 job_type_id: job_definition.job_type_id,
877 input_payload_json: job_definition.input_payload_json,
878 target_execution_time: None,
879 retry_policy: Some(common::v1::JobRetryPolicy {
880 retries: job_definition.retry_policy.retries,
881 }),
882 timeout_policy: Some(common::v1::JobTimeoutPolicy {
883 timeout: job_definition
884 .timeout_policy
885 .timeout
886 .and_then(|d| d.try_into().ok()),
887 base_time: match job_definition.timeout_policy.base_time {
888 JobTimeoutBaseTime::StartTime => {
889 common::v1::JobTimeoutBaseTime::StartTime.into()
890 }
891 JobTimeoutBaseTime::TargetExecutionTime => {
892 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
893 }
894 },
895 }),
896 labels: job_definition
897 .labels
898 .into_iter()
899 .map(|(key, value)| common::v1::JobLabel { key, value })
900 .collect(),
901 metadata_json: None,
902 })),
903 }
904 }
905 }
906 }
907}
908
909impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
910 type Error = Status;
911
912 fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
913 let value = value.job_creation.ok_or_else(|| {
914 Status::invalid_argument("missing job creation policy in schedule definition")
915 })?;
916
917 match value {
918 JobCreation::JobDefinition(job_definition) => {
919 Ok(Self::JobDefinition(ScheduleNewJobDefinition {
920 job_type_id: job_definition.job_type_id,
921 input_payload_json: job_definition.input_payload_json,
922 timeout_policy: job_definition
923 .timeout_policy
924 .map(Into::into)
925 .unwrap_or_default(),
926 retry_policy: job_definition
927 .retry_policy
928 .map(Into::into)
929 .unwrap_or_default(),
930 labels: job_definition
931 .labels
932 .into_iter()
933 .map(|label| (label.key, label.value))
934 .collect(),
935 }))
936 }
937 }
938 }
939}
940
941impl From<ScheduleTimeRange> for TimeRange {
942 fn from(value: ScheduleTimeRange) -> Self {
943 Self {
944 start: value.start.map(Into::into),
945 end: value.end.map(Into::into),
946 }
947 }
948}
949
950impl TryFrom<TimeRange> for ScheduleTimeRange {
951 type Error = Status;
952
953 fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
954 Ok(Self {
955 start: value
956 .start
957 .map(SystemTime::try_from)
958 .transpose()
959 .map_err(|error| {
960 Status::invalid_argument(format!("unsupported timestamp: {error}"))
961 })?,
962 end: value
963 .end
964 .map(SystemTime::try_from)
965 .transpose()
966 .map_err(|error| {
967 Status::invalid_argument(format!("unsupported timestamp: {error}"))
968 })?,
969 })
970 }
971}
972
973impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
974 fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
975 match value {
976 common::v1::ScheduleMissedTimePolicy::Skip
977 | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
978 common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
979 }
980 }
981}
982
983impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
984 fn from(value: ScheduleMissedTimePolicy) -> Self {
985 match value {
986 ScheduleMissedTimePolicy::Skip => Self::Skip,
987 ScheduleMissedTimePolicy::Create => Self::Create,
988 }
989 }
990}
991
992impl From<server::v1::JobQueryOrder> for JobQueryOrder {
993 fn from(value: server::v1::JobQueryOrder) -> Self {
994 match value {
995 server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
996 server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
997 Self::CreatedAtDesc
998 }
999 server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
1000 server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
1001 }
1002 }
1003}
1004
1005impl From<JobDetails> for Job {
1006 fn from(job: JobDetails) -> Self {
1007 Job {
1008 id: job.id.to_string(),
1009 schedule_id: job.schedule_id.map(|t| t.to_string()),
1010 cancelled: job.cancelled,
1011 active: job.active,
1012 definition: Some(JobDefinition {
1013 job_type_id: job.job_type_id,
1014 input_payload_json: job.input_payload_json,
1015 target_execution_time: Some(job.target_execution_time.into()),
1016 retry_policy: Some(common::v1::JobRetryPolicy {
1017 retries: job.retry_policy.retries,
1018 }),
1019 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1020 timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1021 base_time: match job.timeout_policy.base_time {
1022 JobTimeoutBaseTime::StartTime => {
1023 common::v1::JobTimeoutBaseTime::StartTime.into()
1024 }
1025 JobTimeoutBaseTime::TargetExecutionTime => {
1026 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1027 }
1028 },
1029 }),
1030 labels: job
1031 .labels
1032 .into_iter()
1033 .map(|(key, value)| common::v1::JobLabel { key, value })
1034 .collect(),
1035 metadata_json: job.metadata_json,
1036 }),
1037 created_at: Some(job.created_at.into()),
1038 executions: job.executions.into_iter().map(Into::into).collect(),
1039 }
1040 }
1041}
1042
1043impl From<ExecutionDetails> for server::v1::JobExecution {
1044 fn from(value: ExecutionDetails) -> Self {
1045 Self {
1046 id: value.id.to_string(),
1047 job_id: value.job_id.to_string(),
1048 executor_id: value.executor_id.map(|t| t.to_string()),
1049 status: match value.status {
1050 JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1051 JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1052 JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1053 JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1054 JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1055 JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1056 }
1057 .into(),
1058 created_at: Some(value.created_at.into()),
1059 ready_at: value.ready_at.map(Into::into),
1060 assigned_at: value.assigned_at.map(Into::into),
1061 started_at: value.started_at.map(Into::into),
1062 succeeded_at: value.succeeded_at.map(Into::into),
1063 failed_at: value.failed_at.map(Into::into),
1064 output_payload_json: value.output_payload_json,
1065 failure_reason: value.failure_reason,
1066 }
1067 }
1068}
1069
1070impl From<JobType> for ora_proto::common::v1::JobType {
1071 fn from(value: JobType) -> Self {
1072 Self {
1073 id: value.id,
1074 name: if value.name.is_empty() {
1075 None
1076 } else {
1077 Some(value.name)
1078 },
1079 description: if value.description.is_empty() {
1080 None
1081 } else {
1082 Some(value.description)
1083 },
1084 input_schema_json: value.input_schema_json,
1085 output_schema_json: value.output_schema_json,
1086 }
1087 }
1088}
1089
1090impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1091 type Error = Status;
1092
1093 fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1094 Ok(Self {
1095 execution_status: {
1096 let status = filter
1097 .status()
1098 .filter_map(|status| match status {
1099 server::v1::JobExecutionStatus::Unspecified => None,
1100 server::v1::JobExecutionStatus::Pending => {
1101 Some(JobExecutionStatus::Pending)
1102 }
1103 server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1104 server::v1::JobExecutionStatus::Assigned => {
1105 Some(JobExecutionStatus::Assigned)
1106 }
1107 server::v1::JobExecutionStatus::Running => {
1108 Some(JobExecutionStatus::Running)
1109 }
1110 server::v1::JobExecutionStatus::Succeeded => {
1111 Some(JobExecutionStatus::Succeeded)
1112 }
1113 server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1114 })
1115 .collect::<IndexSet<_>>();
1116
1117 if status.is_empty() {
1118 None
1119 } else {
1120 Some(status)
1121 }
1122 },
1123 job_ids: if filter.job_ids.is_empty() {
1124 None
1125 } else {
1126 Some(
1127 filter
1128 .job_ids
1129 .iter()
1130 .map(|f| {
1131 f.parse().map_err(|err| {
1132 Status::invalid_argument(format!("invalid job ID: {err}"))
1133 })
1134 })
1135 .collect::<Result<_, _>>()?,
1136 )
1137 },
1138 job_type_ids: if filter.job_type_ids.is_empty() {
1139 None
1140 } else {
1141 Some(filter.job_type_ids.into_iter().collect())
1142 },
1143 execution_ids: if filter.execution_ids.is_empty() {
1144 None
1145 } else {
1146 Some(
1147 filter
1148 .execution_ids
1149 .iter()
1150 .map(|f| {
1151 f.parse().map_err(|err| {
1152 Status::invalid_argument(format!("invalid execution ID: {err}"))
1153 })
1154 })
1155 .collect::<Result<_, _>>()?,
1156 )
1157 },
1158 schedule_ids: if filter.schedule_ids.is_empty() {
1159 None
1160 } else {
1161 Some(
1162 filter
1163 .schedule_ids
1164 .iter()
1165 .map(|f| {
1166 f.parse().map_err(|err| {
1167 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1168 })
1169 })
1170 .collect::<Result<_, _>>()?,
1171 )
1172 },
1173 labels: if filter.labels.is_empty() {
1174 None
1175 } else {
1176 Some(
1177 filter
1178 .labels
1179 .into_iter()
1180 .filter_map(|label| {
1181 Some((
1182 label.key,
1183 match label.value? {
1184 server::v1::job_label_filter::Value::Exists(_) => {
1185 JobLabelFilterValue::Exists
1186 }
1187 server::v1::job_label_filter::Value::Equals(value) => {
1188 JobLabelFilterValue::Equals(value)
1189 }
1190 },
1191 ))
1192 })
1193 .collect(),
1194 )
1195 },
1196 active: filter.active,
1197 created_after: filter
1198 .created_at
1199 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1200 created_before: filter
1201 .created_at
1202 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1203 target_execution_time_after: filter
1204 .target_execution_time
1205 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1206 target_execution_time_before: filter
1207 .target_execution_time
1208 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1209 })
1210 }
1211}
1212
1213impl From<ScheduleDetails> for server::v1::Schedule {
1214 fn from(value: ScheduleDetails) -> Self {
1215 Self {
1216 id: value.id.to_string(),
1217 definition: Some(common::v1::ScheduleDefinition {
1218 job_timing_policy: Some(match value.job_timing_policy {
1219 ScheduleJobTimingPolicy::Repeat(policy) => {
1220 common::v1::ScheduleJobTimingPolicy {
1221 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1222 common::v1::ScheduleJobTimingPolicyRepeat {
1223 interval: policy.interval.try_into().ok(),
1224 immediate: policy.immediate,
1225 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1226 policy.missed_policy,
1227 )
1228 .into(),
1229 },
1230 )),
1231 }
1232 }
1233 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1234 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1235 common::v1::ScheduleJobTimingPolicyCron {
1236 cron_expression: policy.cron_expression,
1237 immediate: policy.immediate,
1238 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1239 policy.missed_policy,
1240 )
1241 .into(),
1242 },
1243 )),
1244 },
1245 }),
1246 job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1247 job_creation: Some(match value.job_creation_policy {
1248 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1249 JobCreation::JobDefinition(common::v1::JobDefinition {
1250 job_type_id: job_definition.job_type_id,
1251 input_payload_json: job_definition.input_payload_json,
1252 target_execution_time: None,
1253 retry_policy: Some(common::v1::JobRetryPolicy {
1254 retries: job_definition.retry_policy.retries,
1255 }),
1256 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1257 timeout: job_definition
1258 .timeout_policy
1259 .timeout
1260 .and_then(|d| d.try_into().ok()),
1261 base_time: match job_definition.timeout_policy.base_time {
1262 JobTimeoutBaseTime::StartTime => {
1263 common::v1::JobTimeoutBaseTime::StartTime.into()
1264 }
1265 JobTimeoutBaseTime::TargetExecutionTime => {
1266 common::v1::JobTimeoutBaseTime::TargetExecutionTime
1267 .into()
1268 }
1269 },
1270 }),
1271 labels: job_definition
1272 .labels
1273 .into_iter()
1274 .map(|(key, value)| common::v1::JobLabel { key, value })
1275 .collect(),
1276 metadata_json: None,
1277 })
1278 }
1279 }),
1280 }),
1281 time_range: value.time_range.map(|range| TimeRange {
1282 start: range.start.map(Into::into),
1283 end: range.end.map(Into::into),
1284 }),
1285 labels: value
1286 .labels
1287 .into_iter()
1288 .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1289 .collect(),
1290 metadata_json: value.metadata_json,
1291 }),
1292 created_at: Some(value.created_at.into()),
1293 active: value.active,
1294 cancelled: value.cancelled,
1295 }
1296 }
1297}
1298
1299impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1300 fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1301 match value {
1302 server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1303 server::v1::ScheduleQueryOrder::CreatedAtDesc
1304 | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1305 }
1306 }
1307}
1308
1309impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1310 type Error = Status;
1311
1312 fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1313 Ok(Self {
1314 schedule_ids: if value.schedule_ids.is_empty() {
1315 None
1316 } else {
1317 Some(
1318 value
1319 .schedule_ids
1320 .iter()
1321 .map(|f| {
1322 f.parse().map_err(|err| {
1323 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1324 })
1325 })
1326 .collect::<Result<_, _>>()?,
1327 )
1328 },
1329 job_ids: if value.job_ids.is_empty() {
1330 None
1331 } else {
1332 Some(
1333 value
1334 .job_ids
1335 .iter()
1336 .map(|f| {
1337 f.parse().map_err(|err| {
1338 Status::invalid_argument(format!("invalid job ID: {err}"))
1339 })
1340 })
1341 .collect::<Result<_, _>>()?,
1342 )
1343 },
1344 job_type_ids: if value.job_type_ids.is_empty() {
1345 None
1346 } else {
1347 Some(value.job_type_ids.into_iter().collect())
1348 },
1349 labels: if value.labels.is_empty() {
1350 None
1351 } else {
1352 Some(
1353 value
1354 .labels
1355 .into_iter()
1356 .filter_map(|label| {
1357 Some((
1358 label.key,
1359 match label.value? {
1360 server::v1::schedule_label_filter::Value::Exists(_) => {
1361 ScheduleLabelFilterValue::Exists
1362 }
1363 server::v1::schedule_label_filter::Value::Equals(value) => {
1364 ScheduleLabelFilterValue::Equals(value)
1365 }
1366 },
1367 ))
1368 })
1369 .collect(),
1370 )
1371 },
1372 active: value.active,
1373 created_after: value
1374 .created_at
1375 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1376 created_before: value
1377 .created_at
1378 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1379 })
1380 }
1381}
1382
1383impl From<JobType> for snapshot::v1::ExportedJobType {
1384 fn from(job_type: JobType) -> Self {
1385 snapshot::v1::ExportedJobType {
1386 job_type: Some(common::v1::JobType {
1387 id: job_type.id,
1388 name: Some(job_type.name),
1389 description: Some(job_type.description),
1390 input_schema_json: job_type.input_schema_json,
1391 output_schema_json: job_type.output_schema_json,
1392 }),
1393 }
1394 }
1395}
1396
1397impl From<snapshot::v1::ExportedJobType> for JobType {
1398 fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1399 let job_type = job_type.job_type.unwrap();
1400
1401 Self {
1402 id: job_type.id,
1403 name: job_type.name.unwrap_or_default(),
1404 description: job_type.description.unwrap_or_default(),
1405 input_schema_json: job_type.input_schema_json,
1406 output_schema_json: job_type.output_schema_json,
1407 }
1408 }
1409}