Skip to main content

ora_server/grpc/
conv.rs

1use ora_backend::jobs::JobTypeId;
2use tonic::Status;
3use uuid::Uuid;
4
5use crate::proto;
6
7pub(super) trait ResultErrExt<T>: Sized {
8    fn err_status(self) -> Result<T, Status>;
9}
10
11impl<T, E> ResultErrExt<T> for Result<T, E>
12where
13    E: std::error::Error,
14{
15    fn err_status(self) -> Result<T, Status> {
16        self.map_err(backend_err_to_status)
17    }
18}
19
20fn backend_err_to_status<E>(err: E) -> Status
21where
22    E: std::error::Error,
23{
24    Status::internal(err.to_string())
25}
26
27impl TryFrom<proto::jobs::v1::JobType> for ora_backend::jobs::JobType {
28    type Error = tonic::Status;
29
30    fn try_from(value: proto::jobs::v1::JobType) -> Result<Self, Self::Error> {
31        Ok(Self {
32            id: JobTypeId::new(value.id).map_err(|e| Status::invalid_argument(e.to_string()))?,
33            description: value.description,
34            input_schema_json: value.input_schema_json,
35            output_schema_json: value.output_schema_json,
36        })
37    }
38}
39
40impl From<ora_backend::jobs::JobType> for proto::jobs::v1::JobType {
41    fn from(value: ora_backend::jobs::JobType) -> Self {
42        Self {
43            id: value.id.into_inner(),
44            description: value.description,
45            input_schema_json: value.input_schema_json,
46            output_schema_json: value.output_schema_json,
47        }
48    }
49}
50
51impl TryFrom<proto::admin::v1::JobFilters> for ora_backend::jobs::JobFilters {
52    type Error = tonic::Status;
53
54    fn try_from(value: proto::admin::v1::JobFilters) -> Result<Self, Self::Error> {
55        Ok(Self {
56            execution_statuses: if value.execution_statuses.is_empty() {
57                None
58            } else {
59                Some(
60                    value
61                        .execution_statuses()
62                        .map(Into::into)
63                        .collect::<Vec<_>>(),
64                )
65            },
66            job_ids: if value.job_ids.is_empty() {
67                None
68            } else {
69                Some(
70                    value
71                        .job_ids
72                        .into_iter()
73                        .map(|id| {
74                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
75                                Status::invalid_argument(format!("invalid job ID '{id}': {e}"))
76                            })
77                        })
78                        .collect::<Result<_, _>>()?,
79                )
80            },
81            job_type_ids: if value.job_type_ids.is_empty() {
82                None
83            } else {
84                Some(
85                    value
86                        .job_type_ids
87                        .into_iter()
88                        .map(|id| {
89                            JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
90                        })
91                        .collect::<Result<_, _>>()?,
92                )
93            },
94            executor_ids: if value.executor_ids.is_empty() {
95                None
96            } else {
97                Some(
98                    value
99                        .executor_ids
100                        .into_iter()
101                        .map(|id| {
102                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
103                                Status::invalid_argument(format!("invalid executor ID '{id}': {e}"))
104                            })
105                        })
106                        .collect::<Result<_, _>>()?,
107                )
108            },
109            target_execution_time: match value.target_execution_time {
110                Some(range) => Some(range.try_into()?),
111                None => None,
112            },
113            created_at: match value.created_at {
114                Some(range) => Some(range.try_into()?),
115                None => None,
116            },
117            labels: if value.labels.is_empty() {
118                None
119            } else {
120                Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
121            },
122            execution_ids: if value.execution_ids.is_empty() {
123                None
124            } else {
125                Some(
126                    value
127                        .execution_ids
128                        .into_iter()
129                        .map(|id| {
130                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
131                                Status::invalid_argument(format!(
132                                    "invalid execution ID '{id}': {e}"
133                                ))
134                            })
135                        })
136                        .collect::<Result<_, _>>()?,
137                )
138            },
139            schedule_ids: if value.schedule_ids.is_empty() {
140                None
141            } else {
142                Some(
143                    value
144                        .schedule_ids
145                        .into_iter()
146                        .map(|id| {
147                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
148                                Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
149                            })
150                        })
151                        .collect::<Result<_, _>>()?,
152                )
153            },
154        })
155    }
156}
157
158impl TryFrom<proto::admin::v1::JobOrderBy> for Option<ora_backend::jobs::JobOrderBy> {
159    type Error = tonic::Status;
160
161    fn try_from(value: proto::admin::v1::JobOrderBy) -> Result<Self, Self::Error> {
162        match value {
163            proto::admin::v1::JobOrderBy::Unspecified => Ok(None),
164            proto::admin::v1::JobOrderBy::CreatedAtAsc => {
165                Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtAsc))
166            }
167            proto::admin::v1::JobOrderBy::CreatedAtDesc => {
168                Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtDesc))
169            }
170            proto::admin::v1::JobOrderBy::TargetExecutionTimeAsc => {
171                Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeAsc))
172            }
173            proto::admin::v1::JobOrderBy::TargetExecutionTimeDesc => {
174                Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeDesc))
175            }
176        }
177    }
178}
179
180impl TryFrom<proto::common::v1::TimeRange> for ora_backend::common::TimeRange {
181    type Error = tonic::Status;
182
183    fn try_from(value: proto::common::v1::TimeRange) -> Result<Self, Self::Error> {
184        Ok(Self {
185            start: match value.start {
186                Some(ts) => {
187                    Some(ts.try_into().map_err(|e| {
188                        Status::invalid_argument(format!("invalid start time: {e}"))
189                    })?)
190                }
191                None => None,
192            },
193            end: match value.end {
194                Some(ts) => Some(
195                    ts.try_into()
196                        .map_err(|e| Status::invalid_argument(format!("invalid end time: {e}")))?,
197                ),
198                None => None,
199            },
200        })
201    }
202}
203
204impl From<ora_backend::common::TimeRange> for proto::common::v1::TimeRange {
205    fn from(value: ora_backend::common::TimeRange) -> Self {
206        Self {
207            start: value.start.map(Into::into),
208            end: value.end.map(Into::into),
209        }
210    }
211}
212
213impl From<proto::common::v1::Label> for ora_backend::common::Label {
214    fn from(value: proto::common::v1::Label) -> Self {
215        Self {
216            key: value.key,
217            value: value.value,
218        }
219    }
220}
221
222impl From<ora_backend::common::Label> for proto::common::v1::Label {
223    fn from(value: ora_backend::common::Label) -> Self {
224        Self {
225            key: value.key,
226            value: value.value,
227        }
228    }
229}
230
231impl From<proto::common::v1::LabelFilter> for ora_backend::common::LabelFilter {
232    fn from(value: proto::common::v1::LabelFilter) -> Self {
233        Self {
234            key: value.key,
235            value: value.value,
236        }
237    }
238}
239
240impl From<ora_backend::common::LabelFilter> for proto::common::v1::LabelFilter {
241    fn from(value: ora_backend::common::LabelFilter) -> Self {
242        Self {
243            key: value.key,
244            value: value.value,
245        }
246    }
247}
248
249impl TryFrom<proto::jobs::v1::Job> for ora_backend::jobs::JobDefinition {
250    type Error = tonic::Status;
251
252    fn try_from(value: proto::jobs::v1::Job) -> Result<Self, Self::Error> {
253        Ok(Self {
254            job_type_id: JobTypeId::new(value.job_type_id)
255                .map_err(|e| Status::invalid_argument(e.to_string()))?,
256            target_execution_time: value
257                .target_execution_time
258                .ok_or_else(|| Status::invalid_argument("missing target_execution_time"))?
259                .try_into()
260                .map_err(|e| {
261                    Status::invalid_argument(format!("invalid target_execution_time: {e}"))
262                })?,
263            input_payload_json: value.input_payload_json,
264            timeout_policy: value.timeout_policy.unwrap_or_default().into(),
265            retry_policy: value.retry_policy.unwrap_or_default().into(),
266            labels: value.labels.into_iter().map(Into::into).collect(),
267        })
268    }
269}
270
271impl From<ora_backend::jobs::JobDefinition> for proto::jobs::v1::Job {
272    fn from(value: ora_backend::jobs::JobDefinition) -> Self {
273        Self {
274            job_type_id: value.job_type_id.into_inner(),
275            target_execution_time: Some(value.target_execution_time.into()),
276            input_payload_json: value.input_payload_json,
277            timeout_policy: Some(value.timeout_policy.into()),
278            retry_policy: Some(value.retry_policy.into()),
279            labels: value.labels.into_iter().map(Into::into).collect(),
280        }
281    }
282}
283
284impl From<proto::jobs::v1::TimeoutPolicy> for ora_backend::jobs::TimeoutPolicy {
285    fn from(value: proto::jobs::v1::TimeoutPolicy) -> Self {
286        Self {
287            timeout: value
288                .timeout
289                .unwrap_or_default()
290                .try_into()
291                .unwrap_or_default(),
292            base_time: value.base_time().into(),
293        }
294    }
295}
296
297impl From<ora_backend::jobs::TimeoutPolicy> for proto::jobs::v1::TimeoutPolicy {
298    fn from(value: ora_backend::jobs::TimeoutPolicy) -> Self {
299        Self {
300            timeout: Some(prost_types::Duration::try_from(value.timeout).unwrap_or_default()),
301            base_time: proto::jobs::v1::TimeoutBaseTime::from(value.base_time).into(),
302        }
303    }
304}
305
306impl From<proto::jobs::v1::TimeoutBaseTime> for ora_backend::jobs::TimeoutBaseTime {
307    fn from(value: proto::jobs::v1::TimeoutBaseTime) -> Self {
308        match value {
309            proto::jobs::v1::TimeoutBaseTime::Unspecified
310            | proto::jobs::v1::TimeoutBaseTime::StartTime => {
311                ora_backend::jobs::TimeoutBaseTime::StartTime
312            }
313            proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime => {
314                ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime
315            }
316        }
317    }
318}
319
320impl From<ora_backend::jobs::TimeoutBaseTime> for proto::jobs::v1::TimeoutBaseTime {
321    fn from(value: ora_backend::jobs::TimeoutBaseTime) -> Self {
322        match value {
323            ora_backend::jobs::TimeoutBaseTime::StartTime => {
324                proto::jobs::v1::TimeoutBaseTime::StartTime
325            }
326            ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime => {
327                proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime
328            }
329        }
330    }
331}
332
333impl From<proto::jobs::v1::RetryPolicy> for ora_backend::jobs::RetryPolicy {
334    fn from(value: proto::jobs::v1::RetryPolicy) -> Self {
335        Self {
336            retries: value.retries,
337            backoff_duration: value
338                .backoff_duration
339                .unwrap_or_default()
340                .try_into()
341                .unwrap_or_default(),
342            max_backoff_duration: value
343                .max_backoff_duration
344                .map(|d| d.try_into().unwrap_or_default()),
345            backoff_strategy: value.backoff_strategy().into(),
346        }
347    }
348}
349
350impl From<ora_backend::jobs::RetryPolicy> for proto::jobs::v1::RetryPolicy {
351    fn from(value: ora_backend::jobs::RetryPolicy) -> Self {
352        Self {
353            retries: value.retries,
354            backoff_duration: Some(
355                prost_types::Duration::try_from(value.backoff_duration).unwrap_or_default(),
356            ),
357            max_backoff_duration: value
358                .max_backoff_duration
359                .map(|d| prost_types::Duration::try_from(d).unwrap_or_default()),
360            backoff_strategy: proto::jobs::v1::BackoffStrategy::from(value.backoff_strategy).into(),
361        }
362    }
363}
364
365impl From<proto::jobs::v1::BackoffStrategy> for ora_backend::jobs::BackoffStrategy {
366    fn from(value: proto::jobs::v1::BackoffStrategy) -> Self {
367        match value {
368            proto::jobs::v1::BackoffStrategy::Unspecified
369            | proto::jobs::v1::BackoffStrategy::Fixed => ora_backend::jobs::BackoffStrategy::Fixed,
370            proto::jobs::v1::BackoffStrategy::Exponential => {
371                ora_backend::jobs::BackoffStrategy::Exponential
372            }
373        }
374    }
375}
376
377impl From<ora_backend::jobs::BackoffStrategy> for proto::jobs::v1::BackoffStrategy {
378    fn from(value: ora_backend::jobs::BackoffStrategy) -> Self {
379        match value {
380            ora_backend::jobs::BackoffStrategy::Fixed => proto::jobs::v1::BackoffStrategy::Fixed,
381            ora_backend::jobs::BackoffStrategy::Exponential => {
382                proto::jobs::v1::BackoffStrategy::Exponential
383            }
384        }
385    }
386}
387
388impl From<ora_backend::executions::ExecutionStatus> for proto::admin::v1::ExecutionStatus {
389    fn from(value: ora_backend::executions::ExecutionStatus) -> Self {
390        match value {
391            ora_backend::executions::ExecutionStatus::Pending => {
392                proto::admin::v1::ExecutionStatus::Pending
393            }
394            ora_backend::executions::ExecutionStatus::InProgress => {
395                proto::admin::v1::ExecutionStatus::InProgress
396            }
397            ora_backend::executions::ExecutionStatus::Succeeded => {
398                proto::admin::v1::ExecutionStatus::Succeeded
399            }
400            ora_backend::executions::ExecutionStatus::Failed => {
401                proto::admin::v1::ExecutionStatus::Failed
402            }
403            ora_backend::executions::ExecutionStatus::Cancelled => {
404                proto::admin::v1::ExecutionStatus::Cancelled
405            }
406        }
407    }
408}
409
410impl From<proto::admin::v1::ExecutionStatus> for ora_backend::executions::ExecutionStatus {
411    fn from(value: proto::admin::v1::ExecutionStatus) -> Self {
412        match value {
413            proto::admin::v1::ExecutionStatus::Pending
414            | proto::admin::v1::ExecutionStatus::Unspecified => {
415                ora_backend::executions::ExecutionStatus::Pending
416            }
417            proto::admin::v1::ExecutionStatus::InProgress => {
418                ora_backend::executions::ExecutionStatus::InProgress
419            }
420            proto::admin::v1::ExecutionStatus::Succeeded => {
421                ora_backend::executions::ExecutionStatus::Succeeded
422            }
423            proto::admin::v1::ExecutionStatus::Failed => {
424                ora_backend::executions::ExecutionStatus::Failed
425            }
426            proto::admin::v1::ExecutionStatus::Cancelled => {
427                ora_backend::executions::ExecutionStatus::Cancelled
428            }
429        }
430    }
431}
432
433impl From<ora_backend::executions::ExecutionDetails> for proto::admin::v1::Execution {
434    fn from(value: ora_backend::executions::ExecutionDetails) -> Self {
435        Self {
436            id: value.id.to_string(),
437            executor_id: value.executor_id.map(|id| id.0.to_string()),
438            status: proto::admin::v1::ExecutionStatus::from(value.status).into(),
439            created_at: Some(value.created_at.into()),
440            started_at: value.started_at.map(Into::into),
441            succeeded_at: value.succeeded_at.map(Into::into),
442            failed_at: value.failed_at.map(Into::into),
443            cancelled_at: value.cancelled_at.map(Into::into),
444            output_json: value.output_json,
445            failure_reason: value.failure_reason,
446            target_execution_time: Some(value.target_execution_time.into()),
447        }
448    }
449}
450
451impl From<ora_backend::jobs::JobDetails> for proto::admin::v1::Job {
452    fn from(value: ora_backend::jobs::JobDetails) -> Self {
453        Self {
454            id: value.id.to_string(),
455            job: Some(value.job.into()),
456            schedule_id: value.schedule_id.map(|id| id.0.to_string()),
457            created_at: Some(value.created_at.into()),
458            executions: value.executions.into_iter().map(Into::into).collect(),
459        }
460    }
461}
462
463impl TryFrom<proto::schedules::v1::Schedule> for ora_backend::schedules::ScheduleDefinition {
464    type Error = tonic::Status;
465
466    fn try_from(value: proto::schedules::v1::Schedule) -> Result<Self, Self::Error> {
467        let scheduling_policy = value
468            .scheduling
469            .unwrap_or_default()
470            .policy
471            .ok_or_else(|| tonic::Status::invalid_argument("missing scheduling policy"))?;
472
473        Ok(Self {
474            scheduling: match scheduling_policy {
475                proto::schedules::v1::scheduling_policy::Policy::Interval(fixed) => {
476                    ora_backend::schedules::SchedulingPolicy::FixedInterval {
477                        interval: fixed
478                            .interval
479                            .unwrap_or_default()
480                            .try_into()
481                            .unwrap_or_default(),
482                        immediate: fixed.immediate,
483                        missed: fixed.missed_time_policy().into(),
484                    }
485                }
486                proto::schedules::v1::scheduling_policy::Policy::Cron(cron) => {
487                    ora_backend::schedules::SchedulingPolicy::Cron {
488                        missed: cron.missed_time_policy().into(),
489                        expression: cron.cron_expression,
490                        immediate: cron.immediate,
491                    }
492                }
493            },
494            job_template: value
495                .job_template
496                .ok_or_else(|| tonic::Status::invalid_argument("missing job_template"))?
497                .try_into()?,
498            labels: value.labels.into_iter().map(Into::into).collect(),
499            time_range: value.time_range.unwrap_or_default().try_into()?,
500        })
501    }
502}
503
504impl From<ora_backend::schedules::ScheduleDefinition> for proto::schedules::v1::Schedule {
505    fn from(value: ora_backend::schedules::ScheduleDefinition) -> Self {
506        let policy = match value.scheduling {
507            ora_backend::schedules::SchedulingPolicy::FixedInterval {
508                interval,
509                immediate,
510                missed,
511            } => proto::schedules::v1::scheduling_policy::Policy::Interval(
512                proto::schedules::v1::SchedulingPolicyInterval {
513                    interval: Some(prost_types::Duration::try_from(interval).unwrap_or_default()),
514                    immediate,
515                    missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
516                },
517            ),
518            ora_backend::schedules::SchedulingPolicy::Cron {
519                expression,
520                immediate,
521                missed,
522            } => proto::schedules::v1::scheduling_policy::Policy::Cron(
523                proto::schedules::v1::SchedulingPolicyCron {
524                    cron_expression: expression,
525                    immediate,
526                    missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
527                },
528            ),
529        };
530
531        Self {
532            scheduling: Some(proto::schedules::v1::SchedulingPolicy {
533                policy: Some(policy),
534            }),
535            job_template: Some(value.job_template.into()),
536            labels: value.labels.into_iter().map(Into::into).collect(),
537            time_range: Some(value.time_range.into()),
538        }
539    }
540}
541
542impl From<proto::schedules::v1::MissedTimePolicy> for ora_backend::schedules::MissedTimePolicy {
543    fn from(value: proto::schedules::v1::MissedTimePolicy) -> Self {
544        match value {
545            proto::schedules::v1::MissedTimePolicy::Skip
546            | proto::schedules::v1::MissedTimePolicy::Unspecified => {
547                ora_backend::schedules::MissedTimePolicy::Skip
548            }
549            proto::schedules::v1::MissedTimePolicy::Create => {
550                ora_backend::schedules::MissedTimePolicy::Create
551            }
552        }
553    }
554}
555
556impl From<ora_backend::schedules::MissedTimePolicy> for proto::schedules::v1::MissedTimePolicy {
557    fn from(value: ora_backend::schedules::MissedTimePolicy) -> Self {
558        match value {
559            ora_backend::schedules::MissedTimePolicy::Skip => {
560                proto::schedules::v1::MissedTimePolicy::Skip
561            }
562            ora_backend::schedules::MissedTimePolicy::Create => {
563                proto::schedules::v1::MissedTimePolicy::Create
564            }
565        }
566    }
567}
568
569impl TryFrom<proto::admin::v1::ScheduleFilters> for ora_backend::schedules::ScheduleFilters {
570    type Error = tonic::Status;
571
572    fn try_from(value: proto::admin::v1::ScheduleFilters) -> Result<Self, Self::Error> {
573        Ok(Self {
574            statuses: if value.statuses.is_empty() {
575                None
576            } else {
577                Some(value.statuses().map(Into::into).collect::<Vec<_>>())
578            },
579            schedule_ids: if value.schedule_ids.is_empty() {
580                None
581            } else {
582                Some(
583                    value
584                        .schedule_ids
585                        .into_iter()
586                        .map(|id| {
587                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
588                                Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
589                            })
590                        })
591                        .collect::<Result<_, _>>()?,
592                )
593            },
594            created_at: match value.created_at {
595                Some(range) => Some(range.try_into()?),
596                None => None,
597            },
598            labels: if value.labels.is_empty() {
599                None
600            } else {
601                Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
602            },
603            job_type_ids: if value.job_type_ids.is_empty() {
604                None
605            } else {
606                Some(
607                    value
608                        .job_type_ids
609                        .into_iter()
610                        .map(|id| {
611                            JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
612                        })
613                        .collect::<Result<_, _>>()?,
614                )
615            },
616        })
617    }
618}
619
620impl From<ora_backend::schedules::ScheduleStatus> for proto::admin::v1::ScheduleStatus {
621    fn from(value: ora_backend::schedules::ScheduleStatus) -> Self {
622        match value {
623            ora_backend::schedules::ScheduleStatus::Active => {
624                proto::admin::v1::ScheduleStatus::Active
625            }
626            ora_backend::schedules::ScheduleStatus::Stopped => {
627                proto::admin::v1::ScheduleStatus::Stopped
628            }
629        }
630    }
631}
632
633impl From<proto::admin::v1::ScheduleStatus> for ora_backend::schedules::ScheduleStatus {
634    fn from(value: proto::admin::v1::ScheduleStatus) -> Self {
635        match value {
636            proto::admin::v1::ScheduleStatus::Active
637            | proto::admin::v1::ScheduleStatus::Unspecified => {
638                ora_backend::schedules::ScheduleStatus::Active
639            }
640            proto::admin::v1::ScheduleStatus::Stopped => {
641                ora_backend::schedules::ScheduleStatus::Stopped
642            }
643        }
644    }
645}
646
647impl From<ora_backend::schedules::ScheduleDetails> for proto::admin::v1::Schedule {
648    fn from(value: ora_backend::schedules::ScheduleDetails) -> Self {
649        Self {
650            id: value.id.to_string(),
651            schedule: Some(value.schedule.into()),
652            created_at: Some(value.created_at.into()),
653            status: proto::admin::v1::ScheduleStatus::from(value.status).into(),
654            stopped_at: value.stopped_at.map(Into::into),
655        }
656    }
657}
658
659impl TryFrom<proto::admin::v1::ScheduleOrderBy>
660    for Option<ora_backend::schedules::ScheduleOrderBy>
661{
662    type Error = tonic::Status;
663
664    fn try_from(value: proto::admin::v1::ScheduleOrderBy) -> Result<Self, Self::Error> {
665        match value {
666            proto::admin::v1::ScheduleOrderBy::Unspecified => Ok(None),
667            proto::admin::v1::ScheduleOrderBy::CreatedAtAsc => {
668                Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtAsc))
669            }
670            proto::admin::v1::ScheduleOrderBy::CreatedAtDesc => {
671                Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtDesc))
672            }
673        }
674    }
675}