1use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6 common::{
7 self,
8 v1::{
9 self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10 JobDefinition, TimeRange,
11 },
12 },
13 server::{self, v1::Job},
14 snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30 async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32 async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34 async fn jobs_cancelled(
41 &self,
42 job_ids: &[Uuid],
43 timestamp: SystemTime,
44 ) -> eyre::Result<Vec<CancelledJob>>;
45 async fn executions_added(
47 &self,
48 executions: Vec<NewExecution>,
49 timestamp: SystemTime,
50 ) -> eyre::Result<()>;
51 async fn executions_ready(
53 &self,
54 execution_ids: &[Uuid],
55 timestamp: SystemTime,
56 ) -> eyre::Result<()>;
57 async fn execution_assigned(
59 &self,
60 execution_id: Uuid,
61 executor_id: Uuid,
62 timestamp: SystemTime,
63 ) -> eyre::Result<()>;
64 async fn execution_started(
66 &self,
67 execution_id: Uuid,
68 timestamp: SystemTime,
69 ) -> eyre::Result<()>;
70 async fn execution_succeeded(
72 &self,
73 execution_id: Uuid,
74 timestamp: SystemTime,
75 output_payload_json: String,
76 ) -> eyre::Result<()>;
77 async fn executions_failed(
81 &self,
82 execution_ids: &[Uuid],
83 timestamp: SystemTime,
84 reason: String,
85 mark_job_unschedulable: bool,
86 ) -> eyre::Result<()>;
87
88 async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
91
92 async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
97 -> eyre::Result<()>;
98 async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
111 async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
124 async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
137
138 async fn query_jobs(
142 &self,
143 cursor: Option<String>,
144 limit: usize,
145 order: JobQueryOrder,
146 filters: JobQueryFilters,
147 ) -> eyre::Result<JobQueryResult>;
148
149 async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
151
152 async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
154
155 async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
157
158 async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
162
163 async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
165
166 async fn schedules_cancelled(
173 &self,
174 schedule_ids: &[Uuid],
175 timestamp: SystemTime,
176 ) -> eyre::Result<Vec<CancelledSchedule>>;
177
178 async fn schedules_unschedulable(
180 &self,
181 schedule_ids: &[Uuid],
182 timestamp: SystemTime,
183 ) -> eyre::Result<()>;
184
185 async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
201
202 async fn query_schedules(
206 &self,
207 cursor: Option<String>,
208 limit: usize,
209 filters: ScheduleQueryFilters,
210 order: ScheduleQueryOrder,
211 ) -> eyre::Result<ScheduleQueryResult>;
212
213 async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
215
216 async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
218
219 async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
223}
224
225#[async_trait]
228pub trait StorageSnapshot {
229 fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
231
232 async fn import_snapshot(
238 &self,
239 snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
240 ) -> eyre::Result<()>;
241}
242
243#[derive(Debug, Clone)]
245pub struct NewJob {
246 pub id: Uuid,
248 pub schedule_id: Option<Uuid>,
250 pub created_at: SystemTime,
252 pub job_type_id: String,
254 pub target_execution_time: SystemTime,
256 pub input_payload_json: String,
258 pub timeout_policy: JobTimeoutPolicy,
260 pub retry_policy: JobRetryPolicy,
262 pub labels: IndexMap<String, String>,
264 pub metadata_json: Option<String>,
266}
267
268#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct PendingJob {
271 pub id: Uuid,
273 pub target_execution_time: SystemTime,
275 pub execution_count: u64,
277 pub retry_policy: JobRetryPolicy,
279 pub timeout_policy: JobTimeoutPolicy,
281}
282
283#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
285pub struct JobTimeoutPolicy {
286 pub timeout: Option<Duration>,
288 pub base_time: JobTimeoutBaseTime,
292}
293
294impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
295 fn from(proto: v1::JobTimeoutPolicy) -> Self {
296 Self {
297 timeout: proto.timeout.and_then(|d| d.try_into().ok()),
298 base_time: proto.base_time().into(),
299 }
300 }
301}
302
303impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
304 fn from(policy: JobTimeoutPolicy) -> Self {
305 Self {
306 timeout: policy.timeout.and_then(|d| d.try_into().ok()),
307 base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
308 }
309 }
310}
311
312impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
313 fn from(proto: v1::JobTimeoutBaseTime) -> Self {
314 match proto {
315 v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
316 v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
317 Self::StartTime
318 }
319 }
320 }
321}
322
323impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
324 fn from(base_time: JobTimeoutBaseTime) -> Self {
325 match base_time {
326 JobTimeoutBaseTime::StartTime => Self::StartTime,
327 JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
328 }
329 }
330}
331
332#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
334pub enum JobTimeoutBaseTime {
335 #[default]
337 StartTime,
338 TargetExecutionTime,
346}
347
348#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
350pub struct JobRetryPolicy {
351 pub retries: u64,
355}
356
357impl From<v1::JobRetryPolicy> for JobRetryPolicy {
358 fn from(proto: v1::JobRetryPolicy) -> Self {
359 Self {
360 retries: proto.retries,
361 }
362 }
363}
364
365impl From<JobRetryPolicy> for v1::JobRetryPolicy {
366 fn from(policy: JobRetryPolicy) -> Self {
367 Self {
368 retries: policy.retries,
369 }
370 }
371}
372
373#[derive(Debug, Clone)]
375pub struct NewExecution {
376 pub id: Uuid,
378 pub job_id: Uuid,
380 pub attempt_number: u64,
382 pub target_execution_time: SystemTime,
384}
385
386#[derive(Debug, Clone, PartialEq, Eq)]
388pub struct PendingExecution {
389 pub id: Uuid,
391 pub target_execution_time: SystemTime,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct ReadyExecution {
398 pub id: Uuid,
400 pub job_id: Uuid,
402 pub input_payload_json: String,
404 pub attempt_number: u64,
406 pub job_type_id: String,
408 pub target_execution_time: SystemTime,
410 pub timeout_policy: JobTimeoutPolicy,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct JobType {
417 pub id: String,
419 pub name: String,
421 pub description: String,
423 pub input_schema_json: Option<String>,
425 pub output_schema_json: Option<String>,
427}
428
429#[derive(Debug, Default, Clone, Serialize, Deserialize)]
431pub struct JobQueryFilters {
432 pub job_ids: Option<IndexSet<Uuid>>,
434 pub job_type_ids: Option<IndexSet<String>>,
436 pub execution_ids: Option<IndexSet<Uuid>>,
438 pub schedule_ids: Option<IndexSet<Uuid>>,
440 pub execution_status: Option<IndexSet<JobExecutionStatus>>,
442 pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
444 pub active: Option<bool>,
446 pub created_after: Option<SystemTime>,
448 pub created_before: Option<SystemTime>,
450 pub target_execution_time_after: Option<SystemTime>,
452 pub target_execution_time_before: Option<SystemTime>,
454}
455
456#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
458pub enum JobQueryOrder {
459 CreatedAtAsc,
461 CreatedAtDesc,
463 TargetExecutionTimeAsc,
465 TargetExecutionTimeDesc,
467}
468
469#[derive(Debug, Clone)]
471pub struct JobQueryResult {
472 pub jobs: Vec<JobDetails>,
474 pub cursor: Option<String>,
476 pub has_more: bool,
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
482pub enum JobExecutionStatus {
483 Pending,
485 Ready,
487 Assigned,
489 Running,
491 Succeeded,
493 Failed,
495}
496
497#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499pub enum JobLabelFilterValue {
500 Exists,
502 Equals(String),
504}
505
506#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct JobDetails {
509 pub active: bool,
511 pub cancelled: bool,
513 pub id: Uuid,
515 pub job_type_id: String,
517 pub schedule_id: Option<Uuid>,
519 pub target_execution_time: SystemTime,
523 pub input_payload_json: String,
525 pub labels: IndexMap<String, String>,
527 pub timeout_policy: JobTimeoutPolicy,
529 pub retry_policy: JobRetryPolicy,
531 pub created_at: SystemTime,
533 pub executions: Vec<ExecutionDetails>,
535 pub metadata_json: Option<String>,
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
542pub struct ExecutionDetails {
543 pub id: Uuid,
545 pub job_id: Uuid,
547 pub executor_id: Option<Uuid>,
549 pub status: JobExecutionStatus,
551 pub created_at: SystemTime,
553 pub ready_at: Option<SystemTime>,
555 pub assigned_at: Option<SystemTime>,
557 pub started_at: Option<SystemTime>,
559 pub succeeded_at: Option<SystemTime>,
561 pub failed_at: Option<SystemTime>,
563 pub output_payload_json: Option<String>,
565 pub failure_reason: Option<String>,
567}
568
569#[derive(Debug, Clone, PartialEq, Eq)]
571pub struct CancelledJob {
572 pub id: Uuid,
574 pub active_execution: Option<Uuid>,
576}
577
578#[derive(Debug, Clone)]
580pub struct NewSchedule {
581 pub id: Uuid,
583 pub created_at: SystemTime,
585 pub job_timing_policy: ScheduleJobTimingPolicy,
587 pub job_creation_policy: ScheduleJobCreationPolicy,
589 pub labels: IndexMap<String, String>,
591 pub time_range: Option<ScheduleTimeRange>,
593 pub metadata_json: Option<String>,
595}
596
597#[derive(Debug, Clone, PartialEq, Eq)]
599pub enum ScheduleJobTimingPolicy {
600 Repeat(SchedulingPolicyRepeat),
602 Cron(SchedulingPolicyCron),
604}
605
606#[derive(Debug, Clone, PartialEq, Eq)]
608pub struct SchedulingPolicyRepeat {
609 pub interval: Duration,
611 pub immediate: bool,
613 pub missed_policy: ScheduleMissedTimePolicy,
615}
616
617#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct SchedulingPolicyCron {
620 pub cron_expression: String,
622 pub immediate: bool,
624 pub missed_policy: ScheduleMissedTimePolicy,
626}
627
628#[derive(Debug, Clone, PartialEq, Eq)]
630pub enum ScheduleMissedTimePolicy {
631 Skip,
633 Create,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq)]
639pub enum ScheduleJobCreationPolicy {
640 JobDefinition(ScheduleNewJobDefinition),
642}
643
644#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ScheduleNewJobDefinition {
647 pub job_type_id: String,
649 pub input_payload_json: String,
651 pub timeout_policy: JobTimeoutPolicy,
653 pub retry_policy: JobRetryPolicy,
655 pub labels: IndexMap<String, String>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq)]
661pub struct CancelledSchedule {
662 pub id: Uuid,
664}
665
666#[derive(Debug, Clone, PartialEq, Eq)]
668pub struct PendingSchedule {
669 pub id: Uuid,
671 pub job_timing_policy: ScheduleJobTimingPolicy,
673 pub job_creation_policy: ScheduleJobCreationPolicy,
675 pub last_target_execution_time: Option<SystemTime>,
678 pub time_range: Option<ScheduleTimeRange>,
680}
681
682#[derive(Debug, Default, Clone, Serialize, Deserialize)]
684pub struct ScheduleQueryFilters {
685 pub schedule_ids: Option<IndexSet<Uuid>>,
687 pub job_ids: Option<IndexSet<Uuid>>,
689 pub job_type_ids: Option<IndexSet<String>>,
691 pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
693 pub active: Option<bool>,
695 pub created_after: Option<SystemTime>,
697 pub created_before: Option<SystemTime>,
699}
700
701#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
703pub enum ScheduleQueryOrder {
704 CreatedAtAsc,
706 CreatedAtDesc,
708}
709
710#[derive(Debug, Clone)]
712pub struct ScheduleQueryResult {
713 pub schedules: Vec<ScheduleDetails>,
715 pub cursor: Option<String>,
717 pub has_more: bool,
719}
720
721#[derive(Debug, Clone, PartialEq, Eq)]
727pub struct ScheduleDetails {
728 pub id: Uuid,
730 pub created_at: SystemTime,
732 pub job_timing_policy: ScheduleJobTimingPolicy,
734 pub job_creation_policy: ScheduleJobCreationPolicy,
736 pub labels: IndexMap<String, String>,
738 pub active: bool,
740 pub cancelled: bool,
742 pub time_range: Option<ScheduleTimeRange>,
744 pub metadata_json: Option<String>,
746}
747
748#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
750pub enum ScheduleLabelFilterValue {
751 Exists,
753 Equals(String),
755}
756
757#[derive(Debug, Clone, Copy, PartialEq, Eq)]
759pub struct ScheduleTimeRange {
760 pub start: Option<SystemTime>,
762 pub end: Option<SystemTime>,
764}
765
766impl ScheduleTimeRange {
767 #[must_use]
769 pub fn contains(&self, time: SystemTime) -> bool {
770 if let Some(start) = self.start {
771 if time < start {
772 return false;
773 }
774 }
775 if let Some(end) = self.end {
776 if time >= end {
777 return false;
778 }
779 }
780 true
781 }
782
783 #[must_use]
785 pub fn is_valid(&self) -> bool {
786 if let (Some(start), Some(end)) = (self.start, self.end) {
787 start < end
788 } else {
789 true
790 }
791 }
792}
793
794impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
795 fn from(value: ScheduleJobTimingPolicy) -> Self {
796 match value {
797 ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
798 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
799 common::v1::ScheduleJobTimingPolicyRepeat {
800 interval: Some(policy.interval.try_into().unwrap()),
801 immediate: policy.immediate,
802 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
803 policy.missed_policy,
804 )
805 .into(),
806 },
807 )),
808 },
809 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
810 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
811 common::v1::ScheduleJobTimingPolicyCron {
812 cron_expression: policy.cron_expression,
813 immediate: policy.immediate,
814 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
815 policy.missed_policy,
816 )
817 .into(),
818 },
819 )),
820 },
821 }
822 }
823}
824
825impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
826 type Error = Status;
827
828 fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
829 let value = value.job_timing.ok_or_else(|| {
830 Status::invalid_argument("missing job timing policy in schedule definition")
831 })?;
832
833 match value {
834 schedule_job_timing_policy::JobTiming::Repeat(policy) => {
835 Ok(Self::Repeat(SchedulingPolicyRepeat {
836 interval: policy
837 .interval
838 .ok_or_else(|| {
839 Status::invalid_argument("missing interval in repeat policy")
840 })?
841 .try_into()
842 .map_err(|_| Status::invalid_argument("invalid interval"))?,
843 immediate: policy.immediate,
844 missed_policy: policy.missed_time_policy().into(),
845 }))
846 }
847 schedule_job_timing_policy::JobTiming::Cron(policy) => {
848 Ok(Self::Cron(SchedulingPolicyCron {
849 missed_policy: policy.missed_time_policy().into(),
850 cron_expression: {
851 cronexpr::parse_crontab(&policy.cron_expression).map_err(|err| {
853 Status::invalid_argument(format!("invalid cron expression: {err}"))
854 })?;
855
856 policy.cron_expression
857 },
858 immediate: policy.immediate,
859 }))
860 }
861 }
862 }
863}
864
865impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
866 fn from(value: ScheduleJobCreationPolicy) -> Self {
867 match value {
868 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
869 common::v1::ScheduleJobCreationPolicy {
870 job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
871 job_type_id: job_definition.job_type_id,
872 input_payload_json: job_definition.input_payload_json,
873 target_execution_time: None,
874 retry_policy: Some(common::v1::JobRetryPolicy {
875 retries: job_definition.retry_policy.retries,
876 }),
877 timeout_policy: Some(common::v1::JobTimeoutPolicy {
878 timeout: job_definition
879 .timeout_policy
880 .timeout
881 .and_then(|d| d.try_into().ok()),
882 base_time: match job_definition.timeout_policy.base_time {
883 JobTimeoutBaseTime::StartTime => {
884 common::v1::JobTimeoutBaseTime::StartTime.into()
885 }
886 JobTimeoutBaseTime::TargetExecutionTime => {
887 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
888 }
889 },
890 }),
891 labels: job_definition
892 .labels
893 .into_iter()
894 .map(|(key, value)| common::v1::JobLabel { key, value })
895 .collect(),
896 metadata_json: None,
897 })),
898 }
899 }
900 }
901 }
902}
903
904impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
905 type Error = Status;
906
907 fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
908 let value = value.job_creation.ok_or_else(|| {
909 Status::invalid_argument("missing job creation policy in schedule definition")
910 })?;
911
912 match value {
913 JobCreation::JobDefinition(job_definition) => {
914 Ok(Self::JobDefinition(ScheduleNewJobDefinition {
915 job_type_id: job_definition.job_type_id,
916 input_payload_json: job_definition.input_payload_json,
917 timeout_policy: job_definition
918 .timeout_policy
919 .map(Into::into)
920 .unwrap_or_default(),
921 retry_policy: job_definition
922 .retry_policy
923 .map(Into::into)
924 .unwrap_or_default(),
925 labels: job_definition
926 .labels
927 .into_iter()
928 .map(|label| (label.key, label.value))
929 .collect(),
930 }))
931 }
932 }
933 }
934}
935
936impl From<ScheduleTimeRange> for TimeRange {
937 fn from(value: ScheduleTimeRange) -> Self {
938 Self {
939 start: value.start.map(Into::into),
940 end: value.end.map(Into::into),
941 }
942 }
943}
944
945impl TryFrom<TimeRange> for ScheduleTimeRange {
946 type Error = Status;
947
948 fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
949 Ok(Self {
950 start: value
951 .start
952 .map(SystemTime::try_from)
953 .transpose()
954 .map_err(|error| {
955 Status::invalid_argument(format!("unsupported timestamp: {error}"))
956 })?,
957 end: value
958 .end
959 .map(SystemTime::try_from)
960 .transpose()
961 .map_err(|error| {
962 Status::invalid_argument(format!("unsupported timestamp: {error}"))
963 })?,
964 })
965 }
966}
967
968impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
969 fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
970 match value {
971 common::v1::ScheduleMissedTimePolicy::Skip
972 | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
973 common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
974 }
975 }
976}
977
978impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
979 fn from(value: ScheduleMissedTimePolicy) -> Self {
980 match value {
981 ScheduleMissedTimePolicy::Skip => Self::Skip,
982 ScheduleMissedTimePolicy::Create => Self::Create,
983 }
984 }
985}
986
987impl From<server::v1::JobQueryOrder> for JobQueryOrder {
988 fn from(value: server::v1::JobQueryOrder) -> Self {
989 match value {
990 server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
991 server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
992 Self::CreatedAtDesc
993 }
994 server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
995 server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
996 }
997 }
998}
999
1000impl From<JobDetails> for Job {
1001 fn from(job: JobDetails) -> Self {
1002 Job {
1003 id: job.id.to_string(),
1004 schedule_id: job.schedule_id.map(|t| t.to_string()),
1005 cancelled: job.cancelled,
1006 active: job.active,
1007 definition: Some(JobDefinition {
1008 job_type_id: job.job_type_id,
1009 input_payload_json: job.input_payload_json,
1010 target_execution_time: Some(job.target_execution_time.into()),
1011 retry_policy: Some(common::v1::JobRetryPolicy {
1012 retries: job.retry_policy.retries,
1013 }),
1014 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1015 timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1016 base_time: match job.timeout_policy.base_time {
1017 JobTimeoutBaseTime::StartTime => {
1018 common::v1::JobTimeoutBaseTime::StartTime.into()
1019 }
1020 JobTimeoutBaseTime::TargetExecutionTime => {
1021 common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1022 }
1023 },
1024 }),
1025 labels: job
1026 .labels
1027 .into_iter()
1028 .map(|(key, value)| common::v1::JobLabel { key, value })
1029 .collect(),
1030 metadata_json: job.metadata_json,
1031 }),
1032 created_at: Some(job.created_at.into()),
1033 executions: job.executions.into_iter().map(Into::into).collect(),
1034 }
1035 }
1036}
1037
1038impl From<ExecutionDetails> for server::v1::JobExecution {
1039 fn from(value: ExecutionDetails) -> Self {
1040 Self {
1041 id: value.id.to_string(),
1042 job_id: value.job_id.to_string(),
1043 executor_id: value.executor_id.map(|t| t.to_string()),
1044 status: match value.status {
1045 JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1046 JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1047 JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1048 JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1049 JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1050 JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1051 }
1052 .into(),
1053 created_at: Some(value.created_at.into()),
1054 ready_at: value.ready_at.map(Into::into),
1055 assigned_at: value.assigned_at.map(Into::into),
1056 started_at: value.started_at.map(Into::into),
1057 succeeded_at: value.succeeded_at.map(Into::into),
1058 failed_at: value.failed_at.map(Into::into),
1059 output_payload_json: value.output_payload_json,
1060 failure_reason: value.failure_reason,
1061 }
1062 }
1063}
1064
1065impl From<JobType> for ora_proto::common::v1::JobType {
1066 fn from(value: JobType) -> Self {
1067 Self {
1068 id: value.id,
1069 name: if value.name.is_empty() {
1070 None
1071 } else {
1072 Some(value.name)
1073 },
1074 description: if value.description.is_empty() {
1075 None
1076 } else {
1077 Some(value.description)
1078 },
1079 input_schema_json: value.input_schema_json,
1080 output_schema_json: value.output_schema_json,
1081 }
1082 }
1083}
1084
1085impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1086 type Error = Status;
1087
1088 fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1089 Ok(Self {
1090 execution_status: {
1091 let status = filter
1092 .status()
1093 .filter_map(|status| match status {
1094 server::v1::JobExecutionStatus::Unspecified => None,
1095 server::v1::JobExecutionStatus::Pending => {
1096 Some(JobExecutionStatus::Pending)
1097 }
1098 server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1099 server::v1::JobExecutionStatus::Assigned => {
1100 Some(JobExecutionStatus::Assigned)
1101 }
1102 server::v1::JobExecutionStatus::Running => {
1103 Some(JobExecutionStatus::Running)
1104 }
1105 server::v1::JobExecutionStatus::Succeeded => {
1106 Some(JobExecutionStatus::Succeeded)
1107 }
1108 server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1109 })
1110 .collect::<IndexSet<_>>();
1111
1112 if status.is_empty() {
1113 None
1114 } else {
1115 Some(status)
1116 }
1117 },
1118 job_ids: if filter.job_ids.is_empty() {
1119 None
1120 } else {
1121 Some(
1122 filter
1123 .job_ids
1124 .iter()
1125 .map(|f| {
1126 f.parse().map_err(|err| {
1127 Status::invalid_argument(format!("invalid job ID: {err}"))
1128 })
1129 })
1130 .collect::<Result<_, _>>()?,
1131 )
1132 },
1133 job_type_ids: if filter.job_type_ids.is_empty() {
1134 None
1135 } else {
1136 Some(filter.job_type_ids.into_iter().collect())
1137 },
1138 execution_ids: if filter.execution_ids.is_empty() {
1139 None
1140 } else {
1141 Some(
1142 filter
1143 .execution_ids
1144 .iter()
1145 .map(|f| {
1146 f.parse().map_err(|err| {
1147 Status::invalid_argument(format!("invalid execution ID: {err}"))
1148 })
1149 })
1150 .collect::<Result<_, _>>()?,
1151 )
1152 },
1153 schedule_ids: if filter.schedule_ids.is_empty() {
1154 None
1155 } else {
1156 Some(
1157 filter
1158 .schedule_ids
1159 .iter()
1160 .map(|f| {
1161 f.parse().map_err(|err| {
1162 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1163 })
1164 })
1165 .collect::<Result<_, _>>()?,
1166 )
1167 },
1168 labels: if filter.labels.is_empty() {
1169 None
1170 } else {
1171 Some(
1172 filter
1173 .labels
1174 .into_iter()
1175 .filter_map(|label| {
1176 Some((
1177 label.key,
1178 match label.value? {
1179 server::v1::job_label_filter::Value::Exists(_) => {
1180 JobLabelFilterValue::Exists
1181 }
1182 server::v1::job_label_filter::Value::Equals(value) => {
1183 JobLabelFilterValue::Equals(value)
1184 }
1185 },
1186 ))
1187 })
1188 .collect(),
1189 )
1190 },
1191 active: filter.active,
1192 created_after: filter
1193 .created_at
1194 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1195 created_before: filter
1196 .created_at
1197 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1198 target_execution_time_after: filter
1199 .target_execution_time
1200 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1201 target_execution_time_before: filter
1202 .target_execution_time
1203 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1204 })
1205 }
1206}
1207
1208impl From<ScheduleDetails> for server::v1::Schedule {
1209 fn from(value: ScheduleDetails) -> Self {
1210 Self {
1211 id: value.id.to_string(),
1212 definition: Some(common::v1::ScheduleDefinition {
1213 job_timing_policy: Some(match value.job_timing_policy {
1214 ScheduleJobTimingPolicy::Repeat(policy) => {
1215 common::v1::ScheduleJobTimingPolicy {
1216 job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1217 common::v1::ScheduleJobTimingPolicyRepeat {
1218 interval: policy.interval.try_into().ok(),
1219 immediate: policy.immediate,
1220 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1221 policy.missed_policy,
1222 )
1223 .into(),
1224 },
1225 )),
1226 }
1227 }
1228 ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1229 job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1230 common::v1::ScheduleJobTimingPolicyCron {
1231 cron_expression: policy.cron_expression,
1232 immediate: policy.immediate,
1233 missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1234 policy.missed_policy,
1235 )
1236 .into(),
1237 },
1238 )),
1239 },
1240 }),
1241 job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1242 job_creation: Some(match value.job_creation_policy {
1243 ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1244 JobCreation::JobDefinition(common::v1::JobDefinition {
1245 job_type_id: job_definition.job_type_id,
1246 input_payload_json: job_definition.input_payload_json,
1247 target_execution_time: None,
1248 retry_policy: Some(common::v1::JobRetryPolicy {
1249 retries: job_definition.retry_policy.retries,
1250 }),
1251 timeout_policy: Some(common::v1::JobTimeoutPolicy {
1252 timeout: job_definition
1253 .timeout_policy
1254 .timeout
1255 .and_then(|d| d.try_into().ok()),
1256 base_time: match job_definition.timeout_policy.base_time {
1257 JobTimeoutBaseTime::StartTime => {
1258 common::v1::JobTimeoutBaseTime::StartTime.into()
1259 }
1260 JobTimeoutBaseTime::TargetExecutionTime => {
1261 common::v1::JobTimeoutBaseTime::TargetExecutionTime
1262 .into()
1263 }
1264 },
1265 }),
1266 labels: job_definition
1267 .labels
1268 .into_iter()
1269 .map(|(key, value)| common::v1::JobLabel { key, value })
1270 .collect(),
1271 metadata_json: None,
1272 })
1273 }
1274 }),
1275 }),
1276 time_range: value.time_range.map(|range| TimeRange {
1277 start: range.start.map(Into::into),
1278 end: range.end.map(Into::into),
1279 }),
1280 labels: value
1281 .labels
1282 .into_iter()
1283 .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1284 .collect(),
1285 metadata_json: value.metadata_json,
1286 }),
1287 created_at: Some(value.created_at.into()),
1288 active: value.active,
1289 cancelled: value.cancelled,
1290 }
1291 }
1292}
1293
1294impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1295 fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1296 match value {
1297 server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1298 server::v1::ScheduleQueryOrder::CreatedAtDesc
1299 | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1300 }
1301 }
1302}
1303
1304impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1305 type Error = Status;
1306
1307 fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1308 Ok(Self {
1309 schedule_ids: if value.schedule_ids.is_empty() {
1310 None
1311 } else {
1312 Some(
1313 value
1314 .schedule_ids
1315 .iter()
1316 .map(|f| {
1317 f.parse().map_err(|err| {
1318 Status::invalid_argument(format!("invalid schedule ID: {err}"))
1319 })
1320 })
1321 .collect::<Result<_, _>>()?,
1322 )
1323 },
1324 job_ids: if value.job_ids.is_empty() {
1325 None
1326 } else {
1327 Some(
1328 value
1329 .job_ids
1330 .iter()
1331 .map(|f| {
1332 f.parse().map_err(|err| {
1333 Status::invalid_argument(format!("invalid job ID: {err}"))
1334 })
1335 })
1336 .collect::<Result<_, _>>()?,
1337 )
1338 },
1339 job_type_ids: if value.job_type_ids.is_empty() {
1340 None
1341 } else {
1342 Some(value.job_type_ids.into_iter().collect())
1343 },
1344 labels: if value.labels.is_empty() {
1345 None
1346 } else {
1347 Some(
1348 value
1349 .labels
1350 .into_iter()
1351 .filter_map(|label| {
1352 Some((
1353 label.key,
1354 match label.value? {
1355 server::v1::schedule_label_filter::Value::Exists(_) => {
1356 ScheduleLabelFilterValue::Exists
1357 }
1358 server::v1::schedule_label_filter::Value::Equals(value) => {
1359 ScheduleLabelFilterValue::Equals(value)
1360 }
1361 },
1362 ))
1363 })
1364 .collect(),
1365 )
1366 },
1367 active: value.active,
1368 created_after: value
1369 .created_at
1370 .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1371 created_before: value
1372 .created_at
1373 .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1374 })
1375 }
1376}
1377
1378impl From<JobType> for snapshot::v1::ExportedJobType {
1379 fn from(job_type: JobType) -> Self {
1380 snapshot::v1::ExportedJobType {
1381 job_type: Some(common::v1::JobType {
1382 id: job_type.id,
1383 name: Some(job_type.name),
1384 description: Some(job_type.description),
1385 input_schema_json: job_type.input_schema_json,
1386 output_schema_json: job_type.output_schema_json,
1387 }),
1388 }
1389 }
1390}
1391
1392impl From<snapshot::v1::ExportedJobType> for JobType {
1393 fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1394 let job_type = job_type.job_type.unwrap();
1395
1396 Self {
1397 id: job_type.id,
1398 name: job_type.name.unwrap_or_default(),
1399 description: job_type.description.unwrap_or_default(),
1400 input_schema_json: job_type.input_schema_json,
1401 output_schema_json: job_type.output_schema_json,
1402 }
1403 }
1404}