Skip to main content

ora_server/grpc/
conv.rs

1use ora_backend::jobs::JobTypeId;
2use tonic::Status;
3use uuid::Uuid;
4
5use crate::proto;
6
7pub(super) trait ResultErrExt<T>: Sized {
8    fn err_status(self) -> Result<T, Status>;
9}
10
11impl<T, E> ResultErrExt<T> for Result<T, E>
12where
13    E: std::error::Error,
14{
15    fn err_status(self) -> Result<T, Status> {
16        self.map_err(backend_err_to_status)
17    }
18}
19
20fn backend_err_to_status<E>(err: E) -> Status
21where
22    E: std::error::Error,
23{
24    Status::internal(err.to_string())
25}
26
27impl TryFrom<proto::jobs::v1::JobType> for ora_backend::jobs::JobType {
28    type Error = tonic::Status;
29
30    fn try_from(value: proto::jobs::v1::JobType) -> Result<Self, Self::Error> {
31        Ok(Self {
32            id: JobTypeId::new(value.id).map_err(|e| Status::invalid_argument(e.to_string()))?,
33            description: value.description,
34            input_schema_json: value.input_schema_json,
35            output_schema_json: value.output_schema_json,
36        })
37    }
38}
39
40impl From<ora_backend::jobs::JobType> for proto::jobs::v1::JobType {
41    fn from(value: ora_backend::jobs::JobType) -> Self {
42        Self {
43            id: value.id.into_inner(),
44            description: value.description,
45            input_schema_json: value.input_schema_json,
46            output_schema_json: value.output_schema_json,
47        }
48    }
49}
50
51impl TryFrom<proto::admin::v1::JobFilters> for ora_backend::jobs::JobFilters {
52    type Error = tonic::Status;
53
54    fn try_from(value: proto::admin::v1::JobFilters) -> Result<Self, Self::Error> {
55        Ok(Self {
56            execution_statuses: if value.execution_statuses.is_empty() {
57                None
58            } else {
59                Some(
60                    value
61                        .execution_statuses()
62                        .map(Into::into)
63                        .collect::<Vec<_>>(),
64                )
65            },
66            job_ids: if value.job_ids.is_empty() {
67                None
68            } else {
69                Some(
70                    value
71                        .job_ids
72                        .into_iter()
73                        .map(|id| {
74                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
75                                Status::invalid_argument(format!("invalid job ID '{id}': {e}"))
76                            })
77                        })
78                        .collect::<Result<_, _>>()?,
79                )
80            },
81            job_type_ids: if value.job_type_ids.is_empty() {
82                None
83            } else {
84                Some(
85                    value
86                        .job_type_ids
87                        .into_iter()
88                        .map(|id| {
89                            JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
90                        })
91                        .collect::<Result<_, _>>()?,
92                )
93            },
94            executor_ids: if value.executor_ids.is_empty() {
95                None
96            } else {
97                Some(
98                    value
99                        .executor_ids
100                        .into_iter()
101                        .map(|id| {
102                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
103                                Status::invalid_argument(format!("invalid executor ID '{id}': {e}"))
104                            })
105                        })
106                        .collect::<Result<_, _>>()?,
107                )
108            },
109            target_execution_time: match value.target_execution_time {
110                Some(range) => Some(range.try_into()?),
111                None => None,
112            },
113            created_at: match value.created_at {
114                Some(range) => Some(range.try_into()?),
115                None => None,
116            },
117            labels: if value.labels.is_empty() {
118                None
119            } else {
120                Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
121            },
122            execution_ids: if value.execution_ids.is_empty() {
123                None
124            } else {
125                Some(
126                    value
127                        .execution_ids
128                        .into_iter()
129                        .map(|id| {
130                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
131                                Status::invalid_argument(format!(
132                                    "invalid execution ID '{id}': {e}"
133                                ))
134                            })
135                        })
136                        .collect::<Result<_, _>>()?,
137                )
138            },
139            schedule_ids: if value.schedule_ids.is_empty() {
140                None
141            } else {
142                Some(
143                    value
144                        .schedule_ids
145                        .into_iter()
146                        .map(|id| {
147                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
148                                Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
149                            })
150                        })
151                        .collect::<Result<_, _>>()?,
152                )
153            },
154        })
155    }
156}
157
158impl TryFrom<proto::admin::v1::JobOrderBy> for Option<ora_backend::jobs::JobOrderBy> {
159    type Error = tonic::Status;
160
161    fn try_from(value: proto::admin::v1::JobOrderBy) -> Result<Self, Self::Error> {
162        match value {
163            proto::admin::v1::JobOrderBy::Unspecified => Ok(None),
164            proto::admin::v1::JobOrderBy::CreatedAtAsc => {
165                Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtAsc))
166            }
167            proto::admin::v1::JobOrderBy::CreatedAtDesc => {
168                Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtDesc))
169            }
170            proto::admin::v1::JobOrderBy::TargetExecutionTimeAsc => {
171                Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeAsc))
172            }
173            proto::admin::v1::JobOrderBy::TargetExecutionTimeDesc => {
174                Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeDesc))
175            }
176        }
177    }
178}
179
180impl TryFrom<proto::common::v1::TimeRange> for ora_backend::common::TimeRange {
181    type Error = tonic::Status;
182
183    fn try_from(value: proto::common::v1::TimeRange) -> Result<Self, Self::Error> {
184        Ok(Self {
185            start: match value.start {
186                Some(ts) => {
187                    Some(ts.try_into().map_err(|e| {
188                        Status::invalid_argument(format!("invalid start time: {e}"))
189                    })?)
190                }
191                None => None,
192            },
193            end: match value.end {
194                Some(ts) => Some(
195                    ts.try_into()
196                        .map_err(|e| Status::invalid_argument(format!("invalid end time: {e}")))?,
197                ),
198                None => None,
199            },
200        })
201    }
202}
203
204impl From<ora_backend::common::TimeRange> for proto::common::v1::TimeRange {
205    fn from(value: ora_backend::common::TimeRange) -> Self {
206        Self {
207            start: value.start.map(Into::into),
208            end: value.end.map(Into::into),
209        }
210    }
211}
212
213impl From<proto::common::v1::Label> for ora_backend::common::Label {
214    fn from(value: proto::common::v1::Label) -> Self {
215        Self {
216            key: value.key,
217            value: value.value,
218        }
219    }
220}
221
222impl From<ora_backend::common::Label> for proto::common::v1::Label {
223    fn from(value: ora_backend::common::Label) -> Self {
224        Self {
225            key: value.key,
226            value: value.value,
227        }
228    }
229}
230
231impl From<proto::common::v1::LabelFilter> for ora_backend::common::LabelFilter {
232    fn from(value: proto::common::v1::LabelFilter) -> Self {
233        Self {
234            key: value.key,
235            value: value.value,
236        }
237    }
238}
239
240impl From<ora_backend::common::LabelFilter> for proto::common::v1::LabelFilter {
241    fn from(value: ora_backend::common::LabelFilter) -> Self {
242        Self {
243            key: value.key,
244            value: value.value,
245        }
246    }
247}
248
249impl TryFrom<proto::jobs::v1::Job> for ora_backend::jobs::JobDefinition {
250    type Error = tonic::Status;
251
252    fn try_from(value: proto::jobs::v1::Job) -> Result<Self, Self::Error> {
253        Ok(Self {
254            job_type_id: JobTypeId::new(value.job_type_id)
255                .map_err(|e| Status::invalid_argument(e.to_string()))?,
256            target_execution_time: value
257                .target_execution_time
258                .ok_or_else(|| Status::invalid_argument("missing target_execution_time"))?
259                .try_into()
260                .map_err(|e| {
261                    Status::invalid_argument(format!("invalid target_execution_time: {e}"))
262                })?,
263            input_payload_json: value.input_payload_json,
264            timeout_policy: value.timeout_policy.unwrap_or_default().into(),
265            retry_policy: value.retry_policy.unwrap_or_default().into(),
266            labels: value.labels.into_iter().map(Into::into).collect(),
267        })
268    }
269}
270
271impl From<ora_backend::jobs::JobDefinition> for proto::jobs::v1::Job {
272    fn from(value: ora_backend::jobs::JobDefinition) -> Self {
273        Self {
274            job_type_id: value.job_type_id.into_inner(),
275            target_execution_time: Some(value.target_execution_time.into()),
276            input_payload_json: value.input_payload_json,
277            timeout_policy: Some(value.timeout_policy.into()),
278            retry_policy: Some(value.retry_policy.into()),
279            labels: value.labels.into_iter().map(Into::into).collect(),
280        }
281    }
282}
283
284impl From<proto::jobs::v1::TimeoutPolicy> for ora_backend::jobs::TimeoutPolicy {
285    fn from(value: proto::jobs::v1::TimeoutPolicy) -> Self {
286        Self {
287            timeout: value
288                .timeout
289                .unwrap_or_default()
290                .try_into()
291                .unwrap_or_default(),
292            base_time: value.base_time().into(),
293        }
294    }
295}
296
297impl From<ora_backend::jobs::TimeoutPolicy> for proto::jobs::v1::TimeoutPolicy {
298    fn from(value: ora_backend::jobs::TimeoutPolicy) -> Self {
299        Self {
300            timeout: Some(prost_types::Duration::try_from(value.timeout).unwrap_or_default()),
301            base_time: proto::jobs::v1::TimeoutBaseTime::from(value.base_time).into(),
302        }
303    }
304}
305
306impl From<proto::jobs::v1::TimeoutBaseTime> for ora_backend::jobs::TimeoutBaseTime {
307    fn from(value: proto::jobs::v1::TimeoutBaseTime) -> Self {
308        match value {
309            proto::jobs::v1::TimeoutBaseTime::Unspecified
310            | proto::jobs::v1::TimeoutBaseTime::StartTime => {
311                ora_backend::jobs::TimeoutBaseTime::StartTime
312            }
313            proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime => {
314                ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime
315            }
316        }
317    }
318}
319
320impl From<ora_backend::jobs::TimeoutBaseTime> for proto::jobs::v1::TimeoutBaseTime {
321    fn from(value: ora_backend::jobs::TimeoutBaseTime) -> Self {
322        match value {
323            ora_backend::jobs::TimeoutBaseTime::StartTime => {
324                proto::jobs::v1::TimeoutBaseTime::StartTime
325            }
326            ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime => {
327                proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime
328            }
329        }
330    }
331}
332
333impl From<proto::jobs::v1::RetryPolicy> for ora_backend::jobs::RetryPolicy {
334    fn from(value: proto::jobs::v1::RetryPolicy) -> Self {
335        Self {
336            retries: value.retries,
337        }
338    }
339}
340
341impl From<ora_backend::jobs::RetryPolicy> for proto::jobs::v1::RetryPolicy {
342    fn from(value: ora_backend::jobs::RetryPolicy) -> Self {
343        Self {
344            retries: value.retries,
345        }
346    }
347}
348
349impl From<ora_backend::executions::ExecutionStatus> for proto::admin::v1::ExecutionStatus {
350    fn from(value: ora_backend::executions::ExecutionStatus) -> Self {
351        match value {
352            ora_backend::executions::ExecutionStatus::Pending => {
353                proto::admin::v1::ExecutionStatus::Pending
354            }
355            ora_backend::executions::ExecutionStatus::InProgress => {
356                proto::admin::v1::ExecutionStatus::InProgress
357            }
358            ora_backend::executions::ExecutionStatus::Succeeded => {
359                proto::admin::v1::ExecutionStatus::Succeeded
360            }
361            ora_backend::executions::ExecutionStatus::Failed => {
362                proto::admin::v1::ExecutionStatus::Failed
363            }
364            ora_backend::executions::ExecutionStatus::Cancelled => {
365                proto::admin::v1::ExecutionStatus::Cancelled
366            }
367        }
368    }
369}
370
371impl From<proto::admin::v1::ExecutionStatus> for ora_backend::executions::ExecutionStatus {
372    fn from(value: proto::admin::v1::ExecutionStatus) -> Self {
373        match value {
374            proto::admin::v1::ExecutionStatus::Pending
375            | proto::admin::v1::ExecutionStatus::Unspecified => {
376                ora_backend::executions::ExecutionStatus::Pending
377            }
378            proto::admin::v1::ExecutionStatus::InProgress => {
379                ora_backend::executions::ExecutionStatus::InProgress
380            }
381            proto::admin::v1::ExecutionStatus::Succeeded => {
382                ora_backend::executions::ExecutionStatus::Succeeded
383            }
384            proto::admin::v1::ExecutionStatus::Failed => {
385                ora_backend::executions::ExecutionStatus::Failed
386            }
387            proto::admin::v1::ExecutionStatus::Cancelled => {
388                ora_backend::executions::ExecutionStatus::Cancelled
389            }
390        }
391    }
392}
393
394impl From<ora_backend::executions::ExecutionDetails> for proto::admin::v1::Execution {
395    fn from(value: ora_backend::executions::ExecutionDetails) -> Self {
396        Self {
397            id: value.id.to_string(),
398            executor_id: value.executor_id.map(|id| id.0.to_string()),
399            status: proto::admin::v1::ExecutionStatus::from(value.status).into(),
400            created_at: Some(value.created_at.into()),
401            started_at: value.started_at.map(Into::into),
402            succeeded_at: value.succeeded_at.map(Into::into),
403            failed_at: value.failed_at.map(Into::into),
404            cancelled_at: value.cancelled_at.map(Into::into),
405            output_json: value.output_json,
406            failure_reason: value.failure_reason,
407        }
408    }
409}
410
411impl From<ora_backend::jobs::JobDetails> for proto::admin::v1::Job {
412    fn from(value: ora_backend::jobs::JobDetails) -> Self {
413        Self {
414            id: value.id.to_string(),
415            job: Some(value.job.into()),
416            schedule_id: value.schedule_id.map(|id| id.0.to_string()),
417            created_at: Some(value.created_at.into()),
418            executions: value.executions.into_iter().map(Into::into).collect(),
419        }
420    }
421}
422
423impl TryFrom<proto::schedules::v1::Schedule> for ora_backend::schedules::ScheduleDefinition {
424    type Error = tonic::Status;
425
426    fn try_from(value: proto::schedules::v1::Schedule) -> Result<Self, Self::Error> {
427        let scheduling_policy = value
428            .scheduling
429            .unwrap_or_default()
430            .policy
431            .ok_or_else(|| tonic::Status::invalid_argument("missing scheduling policy"))?;
432
433        Ok(Self {
434            scheduling: match scheduling_policy {
435                proto::schedules::v1::scheduling_policy::Policy::Interval(fixed) => {
436                    ora_backend::schedules::SchedulingPolicy::FixedInterval {
437                        interval: fixed
438                            .interval
439                            .unwrap_or_default()
440                            .try_into()
441                            .unwrap_or_default(),
442                        immediate: fixed.immediate,
443                        missed: fixed.missed_time_policy().into(),
444                    }
445                }
446                proto::schedules::v1::scheduling_policy::Policy::Cron(cron) => {
447                    ora_backend::schedules::SchedulingPolicy::Cron {
448                        missed: cron.missed_time_policy().into(),
449                        expression: cron.cron_expression,
450                        immediate: cron.immediate,
451                    }
452                }
453            },
454            job_template: value
455                .job_template
456                .ok_or_else(|| tonic::Status::invalid_argument("missing job_template"))?
457                .try_into()?,
458            labels: value.labels.into_iter().map(Into::into).collect(),
459            time_range: value.time_range.unwrap_or_default().try_into()?,
460        })
461    }
462}
463
464impl From<ora_backend::schedules::ScheduleDefinition> for proto::schedules::v1::Schedule {
465    fn from(value: ora_backend::schedules::ScheduleDefinition) -> Self {
466        let policy = match value.scheduling {
467            ora_backend::schedules::SchedulingPolicy::FixedInterval {
468                interval,
469                immediate,
470                missed,
471            } => proto::schedules::v1::scheduling_policy::Policy::Interval(
472                proto::schedules::v1::SchedulingPolicyInterval {
473                    interval: Some(prost_types::Duration::try_from(interval).unwrap_or_default()),
474                    immediate,
475                    missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
476                },
477            ),
478            ora_backend::schedules::SchedulingPolicy::Cron {
479                expression,
480                immediate,
481                missed,
482            } => proto::schedules::v1::scheduling_policy::Policy::Cron(
483                proto::schedules::v1::SchedulingPolicyCron {
484                    cron_expression: expression,
485                    immediate,
486                    missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
487                },
488            ),
489        };
490
491        Self {
492            scheduling: Some(proto::schedules::v1::SchedulingPolicy {
493                policy: Some(policy),
494            }),
495            job_template: Some(value.job_template.into()),
496            labels: value.labels.into_iter().map(Into::into).collect(),
497            time_range: Some(value.time_range.into()),
498        }
499    }
500}
501
502impl From<proto::schedules::v1::MissedTimePolicy> for ora_backend::schedules::MissedTimePolicy {
503    fn from(value: proto::schedules::v1::MissedTimePolicy) -> Self {
504        match value {
505            proto::schedules::v1::MissedTimePolicy::Skip
506            | proto::schedules::v1::MissedTimePolicy::Unspecified => {
507                ora_backend::schedules::MissedTimePolicy::Skip
508            }
509            proto::schedules::v1::MissedTimePolicy::Create => {
510                ora_backend::schedules::MissedTimePolicy::Create
511            }
512        }
513    }
514}
515
516impl From<ora_backend::schedules::MissedTimePolicy> for proto::schedules::v1::MissedTimePolicy {
517    fn from(value: ora_backend::schedules::MissedTimePolicy) -> Self {
518        match value {
519            ora_backend::schedules::MissedTimePolicy::Skip => {
520                proto::schedules::v1::MissedTimePolicy::Skip
521            }
522            ora_backend::schedules::MissedTimePolicy::Create => {
523                proto::schedules::v1::MissedTimePolicy::Create
524            }
525        }
526    }
527}
528
529impl TryFrom<proto::admin::v1::ScheduleFilters> for ora_backend::schedules::ScheduleFilters {
530    type Error = tonic::Status;
531
532    fn try_from(value: proto::admin::v1::ScheduleFilters) -> Result<Self, Self::Error> {
533        Ok(Self {
534            statuses: if value.statuses.is_empty() {
535                None
536            } else {
537                Some(value.statuses().map(Into::into).collect::<Vec<_>>())
538            },
539            schedule_ids: if value.schedule_ids.is_empty() {
540                None
541            } else {
542                Some(
543                    value
544                        .schedule_ids
545                        .into_iter()
546                        .map(|id| {
547                            id.parse::<Uuid>().map(Into::into).map_err(|e| {
548                                Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
549                            })
550                        })
551                        .collect::<Result<_, _>>()?,
552                )
553            },
554            created_at: match value.created_at {
555                Some(range) => Some(range.try_into()?),
556                None => None,
557            },
558            labels: if value.labels.is_empty() {
559                None
560            } else {
561                Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
562            },
563            job_type_ids: if value.job_type_ids.is_empty() {
564                None
565            } else {
566                Some(
567                    value
568                        .job_type_ids
569                        .into_iter()
570                        .map(|id| {
571                            JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
572                        })
573                        .collect::<Result<_, _>>()?,
574                )
575            },
576        })
577    }
578}
579
580impl From<ora_backend::schedules::ScheduleStatus> for proto::admin::v1::ScheduleStatus {
581    fn from(value: ora_backend::schedules::ScheduleStatus) -> Self {
582        match value {
583            ora_backend::schedules::ScheduleStatus::Active => {
584                proto::admin::v1::ScheduleStatus::Active
585            }
586            ora_backend::schedules::ScheduleStatus::Stopped => {
587                proto::admin::v1::ScheduleStatus::Stopped
588            }
589        }
590    }
591}
592
593impl From<proto::admin::v1::ScheduleStatus> for ora_backend::schedules::ScheduleStatus {
594    fn from(value: proto::admin::v1::ScheduleStatus) -> Self {
595        match value {
596            proto::admin::v1::ScheduleStatus::Active
597            | proto::admin::v1::ScheduleStatus::Unspecified => {
598                ora_backend::schedules::ScheduleStatus::Active
599            }
600            proto::admin::v1::ScheduleStatus::Stopped => {
601                ora_backend::schedules::ScheduleStatus::Stopped
602            }
603        }
604    }
605}
606
607impl From<ora_backend::schedules::ScheduleDetails> for proto::admin::v1::Schedule {
608    fn from(value: ora_backend::schedules::ScheduleDetails) -> Self {
609        Self {
610            id: value.id.to_string(),
611            schedule: Some(value.schedule.into()),
612            created_at: Some(value.created_at.into()),
613            status: proto::admin::v1::ScheduleStatus::from(value.status).into(),
614            stopped_at: value.stopped_at.map(Into::into),
615        }
616    }
617}
618
619impl TryFrom<proto::admin::v1::ScheduleOrderBy>
620    for Option<ora_backend::schedules::ScheduleOrderBy>
621{
622    type Error = tonic::Status;
623
624    fn try_from(value: proto::admin::v1::ScheduleOrderBy) -> Result<Self, Self::Error> {
625        match value {
626            proto::admin::v1::ScheduleOrderBy::Unspecified => Ok(None),
627            proto::admin::v1::ScheduleOrderBy::CreatedAtAsc => {
628                Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtAsc))
629            }
630            proto::admin::v1::ScheduleOrderBy::CreatedAtDesc => {
631                Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtDesc))
632            }
633        }
634    }
635}