1use ora_backend::jobs::JobTypeId;
2use tonic::Status;
3use uuid::Uuid;
4
5use crate::proto;
6
7pub(super) trait ResultErrExt<T>: Sized {
8 fn err_status(self) -> Result<T, Status>;
9}
10
11impl<T, E> ResultErrExt<T> for Result<T, E>
12where
13 E: std::error::Error,
14{
15 fn err_status(self) -> Result<T, Status> {
16 self.map_err(backend_err_to_status)
17 }
18}
19
20fn backend_err_to_status<E>(err: E) -> Status
21where
22 E: std::error::Error,
23{
24 Status::internal(err.to_string())
25}
26
27impl TryFrom<proto::jobs::v1::JobType> for ora_backend::jobs::JobType {
28 type Error = tonic::Status;
29
30 fn try_from(value: proto::jobs::v1::JobType) -> Result<Self, Self::Error> {
31 Ok(Self {
32 id: JobTypeId::new(value.id).map_err(|e| Status::invalid_argument(e.to_string()))?,
33 description: value.description,
34 input_schema_json: value.input_schema_json,
35 output_schema_json: value.output_schema_json,
36 })
37 }
38}
39
40impl From<ora_backend::jobs::JobType> for proto::jobs::v1::JobType {
41 fn from(value: ora_backend::jobs::JobType) -> Self {
42 Self {
43 id: value.id.into_inner(),
44 description: value.description,
45 input_schema_json: value.input_schema_json,
46 output_schema_json: value.output_schema_json,
47 }
48 }
49}
50
51impl TryFrom<proto::admin::v1::JobFilters> for ora_backend::jobs::JobFilters {
52 type Error = tonic::Status;
53
54 fn try_from(value: proto::admin::v1::JobFilters) -> Result<Self, Self::Error> {
55 Ok(Self {
56 execution_statuses: if value.execution_statuses.is_empty() {
57 None
58 } else {
59 Some(
60 value
61 .execution_statuses()
62 .map(Into::into)
63 .collect::<Vec<_>>(),
64 )
65 },
66 job_ids: if value.job_ids.is_empty() {
67 None
68 } else {
69 Some(
70 value
71 .job_ids
72 .into_iter()
73 .map(|id| {
74 id.parse::<Uuid>().map(Into::into).map_err(|e| {
75 Status::invalid_argument(format!("invalid job ID '{id}': {e}"))
76 })
77 })
78 .collect::<Result<_, _>>()?,
79 )
80 },
81 job_type_ids: if value.job_type_ids.is_empty() {
82 None
83 } else {
84 Some(
85 value
86 .job_type_ids
87 .into_iter()
88 .map(|id| {
89 JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
90 })
91 .collect::<Result<_, _>>()?,
92 )
93 },
94 executor_ids: if value.executor_ids.is_empty() {
95 None
96 } else {
97 Some(
98 value
99 .executor_ids
100 .into_iter()
101 .map(|id| {
102 id.parse::<Uuid>().map(Into::into).map_err(|e| {
103 Status::invalid_argument(format!("invalid executor ID '{id}': {e}"))
104 })
105 })
106 .collect::<Result<_, _>>()?,
107 )
108 },
109 target_execution_time: match value.target_execution_time {
110 Some(range) => Some(range.try_into()?),
111 None => None,
112 },
113 created_at: match value.created_at {
114 Some(range) => Some(range.try_into()?),
115 None => None,
116 },
117 labels: if value.labels.is_empty() {
118 None
119 } else {
120 Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
121 },
122 execution_ids: if value.execution_ids.is_empty() {
123 None
124 } else {
125 Some(
126 value
127 .execution_ids
128 .into_iter()
129 .map(|id| {
130 id.parse::<Uuid>().map(Into::into).map_err(|e| {
131 Status::invalid_argument(format!(
132 "invalid execution ID '{id}': {e}"
133 ))
134 })
135 })
136 .collect::<Result<_, _>>()?,
137 )
138 },
139 schedule_ids: if value.schedule_ids.is_empty() {
140 None
141 } else {
142 Some(
143 value
144 .schedule_ids
145 .into_iter()
146 .map(|id| {
147 id.parse::<Uuid>().map(Into::into).map_err(|e| {
148 Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
149 })
150 })
151 .collect::<Result<_, _>>()?,
152 )
153 },
154 })
155 }
156}
157
158impl TryFrom<proto::admin::v1::JobOrderBy> for Option<ora_backend::jobs::JobOrderBy> {
159 type Error = tonic::Status;
160
161 fn try_from(value: proto::admin::v1::JobOrderBy) -> Result<Self, Self::Error> {
162 match value {
163 proto::admin::v1::JobOrderBy::Unspecified => Ok(None),
164 proto::admin::v1::JobOrderBy::CreatedAtAsc => {
165 Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtAsc))
166 }
167 proto::admin::v1::JobOrderBy::CreatedAtDesc => {
168 Ok(Some(ora_backend::jobs::JobOrderBy::CreatedAtDesc))
169 }
170 proto::admin::v1::JobOrderBy::TargetExecutionTimeAsc => {
171 Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeAsc))
172 }
173 proto::admin::v1::JobOrderBy::TargetExecutionTimeDesc => {
174 Ok(Some(ora_backend::jobs::JobOrderBy::TargetExecutionTimeDesc))
175 }
176 }
177 }
178}
179
180impl TryFrom<proto::common::v1::TimeRange> for ora_backend::common::TimeRange {
181 type Error = tonic::Status;
182
183 fn try_from(value: proto::common::v1::TimeRange) -> Result<Self, Self::Error> {
184 Ok(Self {
185 start: match value.start {
186 Some(ts) => {
187 Some(ts.try_into().map_err(|e| {
188 Status::invalid_argument(format!("invalid start time: {e}"))
189 })?)
190 }
191 None => None,
192 },
193 end: match value.end {
194 Some(ts) => Some(
195 ts.try_into()
196 .map_err(|e| Status::invalid_argument(format!("invalid end time: {e}")))?,
197 ),
198 None => None,
199 },
200 })
201 }
202}
203
204impl From<ora_backend::common::TimeRange> for proto::common::v1::TimeRange {
205 fn from(value: ora_backend::common::TimeRange) -> Self {
206 Self {
207 start: value.start.map(Into::into),
208 end: value.end.map(Into::into),
209 }
210 }
211}
212
213impl From<proto::common::v1::Label> for ora_backend::common::Label {
214 fn from(value: proto::common::v1::Label) -> Self {
215 Self {
216 key: value.key,
217 value: value.value,
218 }
219 }
220}
221
222impl From<ora_backend::common::Label> for proto::common::v1::Label {
223 fn from(value: ora_backend::common::Label) -> Self {
224 Self {
225 key: value.key,
226 value: value.value,
227 }
228 }
229}
230
231impl From<proto::common::v1::LabelFilter> for ora_backend::common::LabelFilter {
232 fn from(value: proto::common::v1::LabelFilter) -> Self {
233 Self {
234 key: value.key,
235 value: value.value,
236 }
237 }
238}
239
240impl From<ora_backend::common::LabelFilter> for proto::common::v1::LabelFilter {
241 fn from(value: ora_backend::common::LabelFilter) -> Self {
242 Self {
243 key: value.key,
244 value: value.value,
245 }
246 }
247}
248
249impl TryFrom<proto::jobs::v1::Job> for ora_backend::jobs::JobDefinition {
250 type Error = tonic::Status;
251
252 fn try_from(value: proto::jobs::v1::Job) -> Result<Self, Self::Error> {
253 Ok(Self {
254 job_type_id: JobTypeId::new(value.job_type_id)
255 .map_err(|e| Status::invalid_argument(e.to_string()))?,
256 target_execution_time: value
257 .target_execution_time
258 .ok_or_else(|| Status::invalid_argument("missing target_execution_time"))?
259 .try_into()
260 .map_err(|e| {
261 Status::invalid_argument(format!("invalid target_execution_time: {e}"))
262 })?,
263 input_payload_json: value.input_payload_json,
264 timeout_policy: value.timeout_policy.unwrap_or_default().into(),
265 retry_policy: value.retry_policy.unwrap_or_default().into(),
266 labels: value.labels.into_iter().map(Into::into).collect(),
267 })
268 }
269}
270
271impl From<ora_backend::jobs::JobDefinition> for proto::jobs::v1::Job {
272 fn from(value: ora_backend::jobs::JobDefinition) -> Self {
273 Self {
274 job_type_id: value.job_type_id.into_inner(),
275 target_execution_time: Some(value.target_execution_time.into()),
276 input_payload_json: value.input_payload_json,
277 timeout_policy: Some(value.timeout_policy.into()),
278 retry_policy: Some(value.retry_policy.into()),
279 labels: value.labels.into_iter().map(Into::into).collect(),
280 }
281 }
282}
283
284impl From<proto::jobs::v1::TimeoutPolicy> for ora_backend::jobs::TimeoutPolicy {
285 fn from(value: proto::jobs::v1::TimeoutPolicy) -> Self {
286 Self {
287 timeout: value
288 .timeout
289 .unwrap_or_default()
290 .try_into()
291 .unwrap_or_default(),
292 base_time: value.base_time().into(),
293 }
294 }
295}
296
297impl From<ora_backend::jobs::TimeoutPolicy> for proto::jobs::v1::TimeoutPolicy {
298 fn from(value: ora_backend::jobs::TimeoutPolicy) -> Self {
299 Self {
300 timeout: Some(prost_types::Duration::try_from(value.timeout).unwrap_or_default()),
301 base_time: proto::jobs::v1::TimeoutBaseTime::from(value.base_time).into(),
302 }
303 }
304}
305
306impl From<proto::jobs::v1::TimeoutBaseTime> for ora_backend::jobs::TimeoutBaseTime {
307 fn from(value: proto::jobs::v1::TimeoutBaseTime) -> Self {
308 match value {
309 proto::jobs::v1::TimeoutBaseTime::Unspecified
310 | proto::jobs::v1::TimeoutBaseTime::StartTime => {
311 ora_backend::jobs::TimeoutBaseTime::StartTime
312 }
313 proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime => {
314 ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime
315 }
316 }
317 }
318}
319
320impl From<ora_backend::jobs::TimeoutBaseTime> for proto::jobs::v1::TimeoutBaseTime {
321 fn from(value: ora_backend::jobs::TimeoutBaseTime) -> Self {
322 match value {
323 ora_backend::jobs::TimeoutBaseTime::StartTime => {
324 proto::jobs::v1::TimeoutBaseTime::StartTime
325 }
326 ora_backend::jobs::TimeoutBaseTime::TargetExecutionTime => {
327 proto::jobs::v1::TimeoutBaseTime::TargetExecutionTime
328 }
329 }
330 }
331}
332
333impl From<proto::jobs::v1::RetryPolicy> for ora_backend::jobs::RetryPolicy {
334 fn from(value: proto::jobs::v1::RetryPolicy) -> Self {
335 Self {
336 retries: value.retries,
337 backoff_duration: value
338 .backoff_duration
339 .unwrap_or_default()
340 .try_into()
341 .unwrap_or_default(),
342 max_backoff_duration: value
343 .max_backoff_duration
344 .map(|d| d.try_into().unwrap_or_default()),
345 backoff_strategy: value.backoff_strategy().into(),
346 }
347 }
348}
349
350impl From<ora_backend::jobs::RetryPolicy> for proto::jobs::v1::RetryPolicy {
351 fn from(value: ora_backend::jobs::RetryPolicy) -> Self {
352 Self {
353 retries: value.retries,
354 backoff_duration: Some(
355 prost_types::Duration::try_from(value.backoff_duration).unwrap_or_default(),
356 ),
357 max_backoff_duration: value
358 .max_backoff_duration
359 .map(|d| prost_types::Duration::try_from(d).unwrap_or_default()),
360 backoff_strategy: proto::jobs::v1::BackoffStrategy::from(value.backoff_strategy).into(),
361 }
362 }
363}
364
365impl From<proto::jobs::v1::BackoffStrategy> for ora_backend::jobs::BackoffStrategy {
366 fn from(value: proto::jobs::v1::BackoffStrategy) -> Self {
367 match value {
368 proto::jobs::v1::BackoffStrategy::Unspecified
369 | proto::jobs::v1::BackoffStrategy::Fixed => ora_backend::jobs::BackoffStrategy::Fixed,
370 proto::jobs::v1::BackoffStrategy::Exponential => {
371 ora_backend::jobs::BackoffStrategy::Exponential
372 }
373 }
374 }
375}
376
377impl From<ora_backend::jobs::BackoffStrategy> for proto::jobs::v1::BackoffStrategy {
378 fn from(value: ora_backend::jobs::BackoffStrategy) -> Self {
379 match value {
380 ora_backend::jobs::BackoffStrategy::Fixed => proto::jobs::v1::BackoffStrategy::Fixed,
381 ora_backend::jobs::BackoffStrategy::Exponential => {
382 proto::jobs::v1::BackoffStrategy::Exponential
383 }
384 }
385 }
386}
387
388impl From<ora_backend::executions::ExecutionStatus> for proto::admin::v1::ExecutionStatus {
389 fn from(value: ora_backend::executions::ExecutionStatus) -> Self {
390 match value {
391 ora_backend::executions::ExecutionStatus::Pending => {
392 proto::admin::v1::ExecutionStatus::Pending
393 }
394 ora_backend::executions::ExecutionStatus::InProgress => {
395 proto::admin::v1::ExecutionStatus::InProgress
396 }
397 ora_backend::executions::ExecutionStatus::Succeeded => {
398 proto::admin::v1::ExecutionStatus::Succeeded
399 }
400 ora_backend::executions::ExecutionStatus::Failed => {
401 proto::admin::v1::ExecutionStatus::Failed
402 }
403 ora_backend::executions::ExecutionStatus::Cancelled => {
404 proto::admin::v1::ExecutionStatus::Cancelled
405 }
406 }
407 }
408}
409
410impl From<proto::admin::v1::ExecutionStatus> for ora_backend::executions::ExecutionStatus {
411 fn from(value: proto::admin::v1::ExecutionStatus) -> Self {
412 match value {
413 proto::admin::v1::ExecutionStatus::Pending
414 | proto::admin::v1::ExecutionStatus::Unspecified => {
415 ora_backend::executions::ExecutionStatus::Pending
416 }
417 proto::admin::v1::ExecutionStatus::InProgress => {
418 ora_backend::executions::ExecutionStatus::InProgress
419 }
420 proto::admin::v1::ExecutionStatus::Succeeded => {
421 ora_backend::executions::ExecutionStatus::Succeeded
422 }
423 proto::admin::v1::ExecutionStatus::Failed => {
424 ora_backend::executions::ExecutionStatus::Failed
425 }
426 proto::admin::v1::ExecutionStatus::Cancelled => {
427 ora_backend::executions::ExecutionStatus::Cancelled
428 }
429 }
430 }
431}
432
433impl From<ora_backend::executions::ExecutionDetails> for proto::admin::v1::Execution {
434 fn from(value: ora_backend::executions::ExecutionDetails) -> Self {
435 Self {
436 id: value.id.to_string(),
437 executor_id: value.executor_id.map(|id| id.0.to_string()),
438 status: proto::admin::v1::ExecutionStatus::from(value.status).into(),
439 created_at: Some(value.created_at.into()),
440 started_at: value.started_at.map(Into::into),
441 succeeded_at: value.succeeded_at.map(Into::into),
442 failed_at: value.failed_at.map(Into::into),
443 cancelled_at: value.cancelled_at.map(Into::into),
444 output_json: value.output_json,
445 failure_reason: value.failure_reason,
446 target_execution_time: Some(value.target_execution_time.into()),
447 }
448 }
449}
450
451impl From<ora_backend::jobs::JobDetails> for proto::admin::v1::Job {
452 fn from(value: ora_backend::jobs::JobDetails) -> Self {
453 Self {
454 id: value.id.to_string(),
455 job: Some(value.job.into()),
456 schedule_id: value.schedule_id.map(|id| id.0.to_string()),
457 created_at: Some(value.created_at.into()),
458 executions: value.executions.into_iter().map(Into::into).collect(),
459 }
460 }
461}
462
463impl TryFrom<proto::schedules::v1::Schedule> for ora_backend::schedules::ScheduleDefinition {
464 type Error = tonic::Status;
465
466 fn try_from(value: proto::schedules::v1::Schedule) -> Result<Self, Self::Error> {
467 let scheduling_policy = value
468 .scheduling
469 .unwrap_or_default()
470 .policy
471 .ok_or_else(|| tonic::Status::invalid_argument("missing scheduling policy"))?;
472
473 Ok(Self {
474 scheduling: match scheduling_policy {
475 proto::schedules::v1::scheduling_policy::Policy::Interval(fixed) => {
476 ora_backend::schedules::SchedulingPolicy::FixedInterval {
477 interval: fixed
478 .interval
479 .unwrap_or_default()
480 .try_into()
481 .unwrap_or_default(),
482 immediate: fixed.immediate,
483 missed: fixed.missed_time_policy().into(),
484 }
485 }
486 proto::schedules::v1::scheduling_policy::Policy::Cron(cron) => {
487 ora_backend::schedules::SchedulingPolicy::Cron {
488 missed: cron.missed_time_policy().into(),
489 expression: cron.cron_expression,
490 immediate: cron.immediate,
491 }
492 }
493 },
494 job_template: value
495 .job_template
496 .ok_or_else(|| tonic::Status::invalid_argument("missing job_template"))?
497 .try_into()?,
498 labels: value.labels.into_iter().map(Into::into).collect(),
499 time_range: value.time_range.unwrap_or_default().try_into()?,
500 })
501 }
502}
503
504impl From<ora_backend::schedules::ScheduleDefinition> for proto::schedules::v1::Schedule {
505 fn from(value: ora_backend::schedules::ScheduleDefinition) -> Self {
506 let policy = match value.scheduling {
507 ora_backend::schedules::SchedulingPolicy::FixedInterval {
508 interval,
509 immediate,
510 missed,
511 } => proto::schedules::v1::scheduling_policy::Policy::Interval(
512 proto::schedules::v1::SchedulingPolicyInterval {
513 interval: Some(prost_types::Duration::try_from(interval).unwrap_or_default()),
514 immediate,
515 missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
516 },
517 ),
518 ora_backend::schedules::SchedulingPolicy::Cron {
519 expression,
520 immediate,
521 missed,
522 } => proto::schedules::v1::scheduling_policy::Policy::Cron(
523 proto::schedules::v1::SchedulingPolicyCron {
524 cron_expression: expression,
525 immediate,
526 missed_time_policy: proto::schedules::v1::MissedTimePolicy::from(missed).into(),
527 },
528 ),
529 };
530
531 Self {
532 scheduling: Some(proto::schedules::v1::SchedulingPolicy {
533 policy: Some(policy),
534 }),
535 job_template: Some(value.job_template.into()),
536 labels: value.labels.into_iter().map(Into::into).collect(),
537 time_range: Some(value.time_range.into()),
538 }
539 }
540}
541
542impl From<proto::schedules::v1::MissedTimePolicy> for ora_backend::schedules::MissedTimePolicy {
543 fn from(value: proto::schedules::v1::MissedTimePolicy) -> Self {
544 match value {
545 proto::schedules::v1::MissedTimePolicy::Skip
546 | proto::schedules::v1::MissedTimePolicy::Unspecified => {
547 ora_backend::schedules::MissedTimePolicy::Skip
548 }
549 proto::schedules::v1::MissedTimePolicy::Create => {
550 ora_backend::schedules::MissedTimePolicy::Create
551 }
552 }
553 }
554}
555
556impl From<ora_backend::schedules::MissedTimePolicy> for proto::schedules::v1::MissedTimePolicy {
557 fn from(value: ora_backend::schedules::MissedTimePolicy) -> Self {
558 match value {
559 ora_backend::schedules::MissedTimePolicy::Skip => {
560 proto::schedules::v1::MissedTimePolicy::Skip
561 }
562 ora_backend::schedules::MissedTimePolicy::Create => {
563 proto::schedules::v1::MissedTimePolicy::Create
564 }
565 }
566 }
567}
568
569impl TryFrom<proto::admin::v1::ScheduleFilters> for ora_backend::schedules::ScheduleFilters {
570 type Error = tonic::Status;
571
572 fn try_from(value: proto::admin::v1::ScheduleFilters) -> Result<Self, Self::Error> {
573 Ok(Self {
574 statuses: if value.statuses.is_empty() {
575 None
576 } else {
577 Some(value.statuses().map(Into::into).collect::<Vec<_>>())
578 },
579 schedule_ids: if value.schedule_ids.is_empty() {
580 None
581 } else {
582 Some(
583 value
584 .schedule_ids
585 .into_iter()
586 .map(|id| {
587 id.parse::<Uuid>().map(Into::into).map_err(|e| {
588 Status::invalid_argument(format!("invalid schedule ID '{id}': {e}"))
589 })
590 })
591 .collect::<Result<_, _>>()?,
592 )
593 },
594 created_at: match value.created_at {
595 Some(range) => Some(range.try_into()?),
596 None => None,
597 },
598 labels: if value.labels.is_empty() {
599 None
600 } else {
601 Some(value.labels.into_iter().map(Into::into).collect::<Vec<_>>())
602 },
603 job_type_ids: if value.job_type_ids.is_empty() {
604 None
605 } else {
606 Some(
607 value
608 .job_type_ids
609 .into_iter()
610 .map(|id| {
611 JobTypeId::new(id).map_err(|e| Status::invalid_argument(e.to_string()))
612 })
613 .collect::<Result<_, _>>()?,
614 )
615 },
616 })
617 }
618}
619
620impl From<ora_backend::schedules::ScheduleStatus> for proto::admin::v1::ScheduleStatus {
621 fn from(value: ora_backend::schedules::ScheduleStatus) -> Self {
622 match value {
623 ora_backend::schedules::ScheduleStatus::Active => {
624 proto::admin::v1::ScheduleStatus::Active
625 }
626 ora_backend::schedules::ScheduleStatus::Stopped => {
627 proto::admin::v1::ScheduleStatus::Stopped
628 }
629 }
630 }
631}
632
633impl From<proto::admin::v1::ScheduleStatus> for ora_backend::schedules::ScheduleStatus {
634 fn from(value: proto::admin::v1::ScheduleStatus) -> Self {
635 match value {
636 proto::admin::v1::ScheduleStatus::Active
637 | proto::admin::v1::ScheduleStatus::Unspecified => {
638 ora_backend::schedules::ScheduleStatus::Active
639 }
640 proto::admin::v1::ScheduleStatus::Stopped => {
641 ora_backend::schedules::ScheduleStatus::Stopped
642 }
643 }
644 }
645}
646
647impl From<ora_backend::schedules::ScheduleDetails> for proto::admin::v1::Schedule {
648 fn from(value: ora_backend::schedules::ScheduleDetails) -> Self {
649 Self {
650 id: value.id.to_string(),
651 schedule: Some(value.schedule.into()),
652 created_at: Some(value.created_at.into()),
653 status: proto::admin::v1::ScheduleStatus::from(value.status).into(),
654 stopped_at: value.stopped_at.map(Into::into),
655 }
656 }
657}
658
659impl TryFrom<proto::admin::v1::ScheduleOrderBy>
660 for Option<ora_backend::schedules::ScheduleOrderBy>
661{
662 type Error = tonic::Status;
663
664 fn try_from(value: proto::admin::v1::ScheduleOrderBy) -> Result<Self, Self::Error> {
665 match value {
666 proto::admin::v1::ScheduleOrderBy::Unspecified => Ok(None),
667 proto::admin::v1::ScheduleOrderBy::CreatedAtAsc => {
668 Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtAsc))
669 }
670 proto::admin::v1::ScheduleOrderBy::CreatedAtDesc => {
671 Ok(Some(ora_backend::schedules::ScheduleOrderBy::CreatedAtDesc))
672 }
673 }
674 }
675}