ora_storage/
lib.rs

1//! Storage interface for the Ora server.
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6    common::{
7        self,
8        v1::{
9            self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10            JobDefinition, TimeRange,
11        },
12    },
13    server::{self, v1::Job},
14    snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21/// Re-export the storage types.
22pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23/// Re-export the storage types.
24pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26/// An interface for storing and querying job and schedule data used
27/// by the Ora server.
28#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30    /// Add or update a job type.
31    async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32    /// Persist the given new jobs.
33    async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34    /// Cancel the given jobs.
35    ///
36    /// - Mark the job as cancelled.
37    /// - Mark the job as unschedulable.
38    ///
39    /// Returns the cancelled jobs with their active executions, if any.
40    async fn jobs_cancelled(
41        &self,
42        job_ids: &[Uuid],
43        timestamp: SystemTime,
44    ) -> eyre::Result<Vec<CancelledJob>>;
45    /// Add new executions.
46    async fn executions_added(
47        &self,
48        executions: Vec<NewExecution>,
49        timestamp: SystemTime,
50    ) -> eyre::Result<()>;
51    /// An execution is ready to be executed.
52    async fn executions_ready(
53        &self,
54        execution_ids: &[Uuid],
55        timestamp: SystemTime,
56    ) -> eyre::Result<()>;
57    /// An execution was assigned to an executor.
58    async fn execution_assigned(
59        &self,
60        execution_id: Uuid,
61        executor_id: Uuid,
62        timestamp: SystemTime,
63    ) -> eyre::Result<()>;
64    /// Set an execution as started.
65    async fn execution_started(
66        &self,
67        execution_id: Uuid,
68        timestamp: SystemTime,
69    ) -> eyre::Result<()>;
70    /// Set an execution as succeeded.
71    async fn execution_succeeded(
72        &self,
73        execution_id: Uuid,
74        timestamp: SystemTime,
75        output_payload_json: String,
76    ) -> eyre::Result<()>;
77    /// Set an execution as failed, if `mark_job_unschedulable` is false
78    /// the job is not marked as unschedulable, this happens when the job
79    /// is retried.
80    async fn executions_failed(
81        &self,
82        execution_ids: &[Uuid],
83        timestamp: SystemTime,
84        reason: String,
85        mark_job_unschedulable: bool,
86    ) -> eyre::Result<()>;
87
88    /// Return assigned executions that are not assigned to any of
89    /// the given executor IDs.
90    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
91
92    /// Mark a job as unschedulable.
93    ///
94    /// No more executions will be created for the jobs and the given IDs must not be
95    /// returned by the `pending_jobs` method. However existing executions are not affected.
96    async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
97        -> eyre::Result<()>;
98    /// Executions that are not yet ready to be executed.
99    ///
100    /// The returned values must be in ascending order,
101    /// an optional `after` parameter can be used to filter out values
102    /// that were created before the given UUID.
103    ///
104    /// If the `after` parameter is given, the stream must not include
105    /// the execution with the given UUID and any IDs that are smaller.
106    ///
107    /// The implementation may choose to return a limited
108    /// number of executions in a single call in order to avoid
109    /// resource exhaustion.
110    async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
111    /// Unassigned executions that are ready at the time of the call.
112    ///
113    /// The returned values must be in ascending order,
114    /// an optional `after` parameter can be used to filter out values
115    /// that were created before the given UUID.
116    ///
117    /// The returned executions must be ordered
118    /// by their ID in ascending order.
119    ///
120    /// The implementation may choose to return a limited
121    /// number of executions in a single call in order to avoid
122    /// resource exhaustion.
123    async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
124    /// Jobs that satisfy the following conditions:
125    ///
126    /// - have no executions in progress
127    /// - are active
128    ///
129    /// The returned values must be in ascending order,
130    /// an optional `after` parameter can be used to filter out values
131    /// that were created before the given UUID.
132    ///
133    /// The implementation may choose to return a limited
134    /// number of executions in a single call in order to avoid
135    /// resource exhaustion.
136    async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
137
138    /// Query and return a list of jobs with the given parameters.
139    ///
140    /// A next token can be provided to continue the query from the last result.
141    async fn query_jobs(
142        &self,
143        cursor: Option<String>,
144        limit: usize,
145        order: JobQueryOrder,
146        filters: JobQueryFilters,
147    ) -> eyre::Result<JobQueryResult>;
148
149    /// Query and return a list of job IDs with the given parameters.
150    async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
151
152    /// Count the number of jobs that satisfy the given filters.
153    async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
154
155    /// Query and return a list of job types with the given parameters.
156    async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
157
158    /// Remove jobs and all related data, returns the removed job IDs.
159    ///
160    /// This should also remove all executions created by the jobs.
161    async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
162
163    /// Persist the given new schedules.
164    async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
165
166    /// Cancel the given schedules.
167    ///
168    /// - Mark the schedule as cancelled.
169    /// - Mark the schedule as unschedulable.
170    ///
171    /// Returns the cancelled schedules.
172    async fn schedules_cancelled(
173        &self,
174        schedule_ids: &[Uuid],
175        timestamp: SystemTime,
176    ) -> eyre::Result<Vec<CancelledSchedule>>;
177
178    /// Mark the given schedules as unschedulable.
179    async fn schedules_unschedulable(
180        &self,
181        schedule_ids: &[Uuid],
182        timestamp: SystemTime,
183    ) -> eyre::Result<()>;
184
185    /// Return all pending schedules.
186    ///
187    /// A pending schedule is a schedule that
188    /// satisfies all the following conditions:
189    /// - is active
190    /// - has not been cancelled
191    /// - has no active jobs
192    ///
193    /// The returned values must be in ascending order,
194    /// an optional `after` parameter can be used to filter out values
195    /// that were created before the given UUID.
196    ///
197    /// The implementation may choose to return a limited
198    /// number of executions in a single call in order to avoid
199    /// resource exhaustion.
200    async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
201
202    /// Query and return a list of schedules with the given parameters.
203    ///
204    /// A next token can be provided to continue the query from the last result.
205    async fn query_schedules(
206        &self,
207        cursor: Option<String>,
208        limit: usize,
209        filters: ScheduleQueryFilters,
210        order: ScheduleQueryOrder,
211    ) -> eyre::Result<ScheduleQueryResult>;
212
213    /// Query and return a list of schedule IDs with the given parameters.
214    async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
215
216    /// Count the number of schedules that satisfy the given filters.
217    async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
218
219    /// Remove schedules and all related data, returns the removed schedule IDs.
220    ///
221    /// This should also remove all jobs created by the schedules.
222    async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
223}
224
225/// A trait for storages that support
226/// exporting and importing snapshots of their data.
227#[async_trait]
228pub trait StorageSnapshot {
229    /// Export a snapshot of the storage.
230    fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
231
232    /// Import a snapshot of the storage.
233    ///
234    /// The snapshot stream must be consumed to completion.
235    ///
236    /// Whether data is overwritten or merged is up to the implementation.
237    async fn import_snapshot(
238        &self,
239        snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
240    ) -> eyre::Result<()>;
241}
242
243/// Essential data for a job.
244#[derive(Debug, Clone)]
245pub struct NewJob {
246    /// The unique identifier of the job.
247    pub id: Uuid,
248    /// The schedule ID of the job.
249    pub schedule_id: Option<Uuid>,
250    /// The time the job was created.
251    pub created_at: SystemTime,
252    /// The job type ID.
253    pub job_type_id: String,
254    /// The target execution time of the job.
255    pub target_execution_time: SystemTime,
256    /// The input payload of the job.
257    pub input_payload_json: String,
258    /// Timeout policy for the job.
259    pub timeout_policy: JobTimeoutPolicy,
260    /// Retry policy for the job.
261    pub retry_policy: JobRetryPolicy,
262    /// Labels of the job.
263    pub labels: IndexMap<String, String>,
264    /// Arbitrary metadata in JSON format.
265    pub metadata_json: Option<String>,
266}
267
268/// A pending job.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct PendingJob {
271    /// The unique identifier of the job.
272    pub id: Uuid,
273    /// The target execution time of the job.
274    pub target_execution_time: SystemTime,
275    /// The number of previous executions.
276    pub execution_count: u64,
277    /// The retry policy for the job.
278    pub retry_policy: JobRetryPolicy,
279    /// The timeout policy for the job.
280    pub timeout_policy: JobTimeoutPolicy,
281}
282
283/// Job timeout policy.
284#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
285pub struct JobTimeoutPolicy {
286    /// The timeout in seconds.
287    pub timeout: Option<Duration>,
288    /// The base time for the timeout.
289    ///
290    /// The timeout is calculated from this time.
291    pub base_time: JobTimeoutBaseTime,
292}
293
294impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
295    fn from(proto: v1::JobTimeoutPolicy) -> Self {
296        Self {
297            timeout: proto.timeout.and_then(|d| d.try_into().ok()),
298            base_time: proto.base_time().into(),
299        }
300    }
301}
302
303impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
304    fn from(policy: JobTimeoutPolicy) -> Self {
305        Self {
306            timeout: policy.timeout.and_then(|d| d.try_into().ok()),
307            base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
308        }
309    }
310}
311
312impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
313    fn from(proto: v1::JobTimeoutBaseTime) -> Self {
314        match proto {
315            v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
316            v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
317                Self::StartTime
318            }
319        }
320    }
321}
322
323impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
324    fn from(base_time: JobTimeoutBaseTime) -> Self {
325        match base_time {
326            JobTimeoutBaseTime::StartTime => Self::StartTime,
327            JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
328        }
329    }
330}
331
332/// The base time for the timeout.
333#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
334pub enum JobTimeoutBaseTime {
335    /// The base time is the start time of the job.
336    #[default]
337    StartTime,
338    /// The base time is the target execution time of the job.
339    ///
340    /// Note that if the target execution time is not set,
341    /// the timeout is calculated from the start time of the job.
342    ///
343    /// If the target execution time is in the past,
344    /// the jobs may be immediately timed out.
345    TargetExecutionTime,
346}
347
348/// Job retry policy.
349#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
350pub struct JobRetryPolicy {
351    /// The number of retries for the job.
352    ///
353    /// If the number of retries is zero, the job is not retried.
354    pub retries: u64,
355}
356
357impl From<v1::JobRetryPolicy> for JobRetryPolicy {
358    fn from(proto: v1::JobRetryPolicy) -> Self {
359        Self {
360            retries: proto.retries,
361        }
362    }
363}
364
365impl From<JobRetryPolicy> for v1::JobRetryPolicy {
366    fn from(policy: JobRetryPolicy) -> Self {
367        Self {
368            retries: policy.retries,
369        }
370    }
371}
372
373/// A new pending execution.
374#[derive(Debug, Clone)]
375pub struct NewExecution {
376    /// The unique identifier of the execution.
377    pub id: Uuid,
378    /// The unique identifier of the job.
379    pub job_id: Uuid,
380    /// Attempt number of the execution.
381    pub attempt_number: u64,
382    /// The target execution time of the job.
383    pub target_execution_time: SystemTime,
384}
385
386/// A new pending execution.
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub struct PendingExecution {
389    /// The unique identifier of the execution.
390    pub id: Uuid,
391    /// The target execution time of the job.
392    pub target_execution_time: SystemTime,
393}
394
395/// An execution that is ready to be executed.
396#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct ReadyExecution {
398    /// The unique identifier of the execution.
399    pub id: Uuid,
400    /// The unique identifier of the job.
401    pub job_id: Uuid,
402    /// The input payload of the job.
403    pub input_payload_json: String,
404    /// Attempt number of the execution.
405    pub attempt_number: u64,
406    /// The job type ID.
407    pub job_type_id: String,
408    /// The target execution time of the job.
409    pub target_execution_time: SystemTime,
410    /// Timeout policy for the job.
411    pub timeout_policy: JobTimeoutPolicy,
412}
413
414/// A job type.
415#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct JobType {
417    /// The ID of the job type.
418    pub id: String,
419    /// The name of the job type.
420    pub name: String,
421    /// The description of the job type.
422    pub description: String,
423    /// The input schema of the job type.
424    pub input_schema_json: Option<String>,
425    /// The output schema of the job type.
426    pub output_schema_json: Option<String>,
427}
428
429/// Filters for querying jobs.
430#[derive(Debug, Default, Clone, Serialize, Deserialize)]
431pub struct JobQueryFilters {
432    /// Job IDs to filter the query results by.
433    pub job_ids: Option<IndexSet<Uuid>>,
434    /// Job type IDs to filter the query results by.
435    pub job_type_ids: Option<IndexSet<String>>,
436    /// Execution IDs to filter the query results by.
437    pub execution_ids: Option<IndexSet<Uuid>>,
438    /// Schedule IDs to filter the query results by.
439    pub schedule_ids: Option<IndexSet<Uuid>>,
440    /// Execution status to filter the query results by.
441    pub execution_status: Option<IndexSet<JobExecutionStatus>>,
442    /// Labels to filter the query results by.
443    pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
444    /// Whether to return active or inactive jobs only.
445    pub active: Option<bool>,
446    /// Jobs created after the given time, inclusive.
447    pub created_after: Option<SystemTime>,
448    /// Jobs created before the given time, exclusive.
449    pub created_before: Option<SystemTime>,
450    /// Jobs with target execution time after the given time, inclusive.
451    pub target_execution_time_after: Option<SystemTime>,
452    /// Jobs with target execution time before the given time, exclusive.
453    pub target_execution_time_before: Option<SystemTime>,
454}
455
456/// The order of jobs returned.
457#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
458pub enum JobQueryOrder {
459    /// Order by the time the job was created in ascending order.
460    CreatedAtAsc,
461    /// Order by the time the job was created in descending order.
462    CreatedAtDesc,
463    /// Order by the target execution time in ascending order.
464    TargetExecutionTimeAsc,
465    /// Order by the target execution time in descending order.
466    TargetExecutionTimeDesc,
467}
468
469/// The results of a job query.
470#[derive(Debug, Clone)]
471pub struct JobQueryResult {
472    /// The jobs that satisfy the query.
473    pub jobs: Vec<JobDetails>,
474    /// A cursor that can be used to continue the query.
475    pub cursor: Option<String>,
476    /// Whether there are more results to query.
477    pub has_more: bool,
478}
479
480/// Job status used for querying jobs.
481#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
482pub enum JobExecutionStatus {
483    /// The job execution is pending and is not ready to run.
484    Pending,
485    /// The job execution is ready to run.
486    Ready,
487    /// The job execution is assigned to an executor.
488    Assigned,
489    /// The job execution is running.
490    Running,
491    /// The job execution is completed successfully.
492    Succeeded,
493    /// The job execution is failed.
494    Failed,
495}
496
497/// Job label filter.
498#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499pub enum JobLabelFilterValue {
500    /// The label exists with any value.
501    Exists,
502    /// The label does not exist.
503    Equals(String),
504}
505
506/// All core information about a job.
507#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct JobDetails {
509    /// Whether the job is active.
510    pub active: bool,
511    /// Whether the job was cancelled.
512    pub cancelled: bool,
513    /// The unique identifier of the job.
514    pub id: Uuid,
515    /// The ID of the job type.
516    pub job_type_id: String,
517    /// The schedule ID of the job.
518    pub schedule_id: Option<Uuid>,
519    /// The target execution time of the job.
520    ///
521    /// If not provided, it should be set to the current time.
522    pub target_execution_time: SystemTime,
523    /// The job input payload JSON that is passed to the executor.
524    pub input_payload_json: String,
525    /// The labels of the job.
526    pub labels: IndexMap<String, String>,
527    /// The timeout policy of the job.
528    pub timeout_policy: JobTimeoutPolicy,
529    /// Retry policy for the job.
530    pub retry_policy: JobRetryPolicy,
531    /// The creation time of the job.
532    pub created_at: SystemTime,
533    /// A list of executions for the job.
534    pub executions: Vec<ExecutionDetails>,
535    /// Arbitrary metadata in JSON format.
536    pub metadata_json: Option<String>,
537}
538
539/// All core information about an execution
540/// that is associated with a job.
541#[derive(Debug, Clone, PartialEq, Eq)]
542pub struct ExecutionDetails {
543    /// The ID of the job execution.
544    pub id: Uuid,
545    /// The ID of the job.
546    pub job_id: Uuid,
547    /// The ID of the associated executor.
548    pub executor_id: Option<Uuid>,
549    /// The status of the job execution.
550    pub status: JobExecutionStatus,
551    /// The time the job execution was created.
552    pub created_at: SystemTime,
553    /// The time the job execution was marked as ready.
554    pub ready_at: Option<SystemTime>,
555    /// The time the job execution was assigned to an executor.
556    pub assigned_at: Option<SystemTime>,
557    /// The time the job execution has started.
558    pub started_at: Option<SystemTime>,
559    /// The time the job execution has succeeded.
560    pub succeeded_at: Option<SystemTime>,
561    /// The time the job execution has failed.
562    pub failed_at: Option<SystemTime>,
563    /// The output payload of the execution.
564    pub output_payload_json: Option<String>,
565    /// The error message of the execution.
566    pub failure_reason: Option<String>,
567}
568
569/// A job that was cancelled.
570#[derive(Debug, Clone, PartialEq, Eq)]
571pub struct CancelledJob {
572    /// The unique identifier of the job.
573    pub id: Uuid,
574    /// Active execution of the job.
575    pub active_execution: Option<Uuid>,
576}
577
578/// A new schedule.
579#[derive(Debug, Clone)]
580pub struct NewSchedule {
581    /// The unique identifier of the schedule.
582    pub id: Uuid,
583    /// The time the schedule was created.
584    pub created_at: SystemTime,
585    /// Scheduling policy for the schedule.
586    pub job_timing_policy: ScheduleJobTimingPolicy,
587    /// Policy for new jobs created by the schedule.
588    pub job_creation_policy: ScheduleJobCreationPolicy,
589    /// Labels of the job.
590    pub labels: IndexMap<String, String>,
591    /// The time range for the schedule.
592    pub time_range: Option<ScheduleTimeRange>,
593    /// Arbitrary metadata in JSON format.
594    pub metadata_json: Option<String>,
595}
596
597/// Scheduling policy for a schedule.
598#[derive(Debug, Clone, PartialEq, Eq)]
599pub enum ScheduleJobTimingPolicy {
600    /// A schedule that repeats.
601    Repeat(SchedulingPolicyRepeat),
602    /// A schedule based on a cron expression.
603    Cron(SchedulingPolicyCron),
604}
605
606/// Scheduling policy for a schedule that repeats.
607#[derive(Debug, Clone, PartialEq, Eq)]
608pub struct SchedulingPolicyRepeat {
609    /// The interval between each job.
610    pub interval: Duration,
611    /// Whether the schedule should create a job immediately.
612    pub immediate: bool,
613    /// The policy for missed jobs.
614    pub missed_policy: ScheduleMissedTimePolicy,
615}
616
617/// Scheduling policy based on a cron expression.
618#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct SchedulingPolicyCron {
620    /// The cron expression.
621    pub cron_expression: String,
622    /// Whether the schedule should create a job immediately.
623    pub immediate: bool,
624    /// The policy for missed jobs.
625    pub missed_policy: ScheduleMissedTimePolicy,
626}
627
628/// Policy for missed jobs.
629#[derive(Debug, Clone, PartialEq, Eq)]
630pub enum ScheduleMissedTimePolicy {
631    /// Skip any missed times.
632    Skip,
633    /// Create a job for each missed time.
634    Create,
635}
636
637/// Policy for new jobs created by a schedule.
638#[derive(Debug, Clone, PartialEq, Eq)]
639pub enum ScheduleJobCreationPolicy {
640    /// Create a new job from the given job definition.
641    JobDefinition(ScheduleNewJobDefinition),
642}
643
644/// A job definition for a new job created by a schedule.
645#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ScheduleNewJobDefinition {
647    /// The job type ID.
648    pub job_type_id: String,
649    /// The input payload of the job.
650    pub input_payload_json: String,
651    /// Timeout policy for the job.
652    pub timeout_policy: JobTimeoutPolicy,
653    /// Retry policy for the job.
654    pub retry_policy: JobRetryPolicy,
655    /// Labels of the job.
656    pub labels: IndexMap<String, String>,
657}
658
659/// A schedule that was cancelled.
660#[derive(Debug, Clone, PartialEq, Eq)]
661pub struct CancelledSchedule {
662    /// The unique identifier of the schedule.
663    pub id: Uuid,
664}
665
666/// A pending schedule.
667#[derive(Debug, Clone, PartialEq, Eq)]
668pub struct PendingSchedule {
669    /// The unique identifier of the schedule.
670    pub id: Uuid,
671    /// Scheduling policy for the schedule.
672    pub job_timing_policy: ScheduleJobTimingPolicy,
673    /// Policy for new jobs created by the schedule.
674    pub job_creation_policy: ScheduleJobCreationPolicy,
675    /// The target execution time of the last
676    /// job created by the schedule, if any.
677    pub last_target_execution_time: Option<SystemTime>,
678    /// The time range for the schedule.
679    pub time_range: Option<ScheduleTimeRange>,
680}
681
682/// Filters for querying schedules.
683#[derive(Debug, Default, Clone, Serialize, Deserialize)]
684pub struct ScheduleQueryFilters {
685    /// Schedule IDs to filter the query results by.
686    pub schedule_ids: Option<IndexSet<Uuid>>,
687    /// Job IDs to filter the query results by.
688    pub job_ids: Option<IndexSet<Uuid>>,
689    /// Job type IDs to filter the query results by.
690    pub job_type_ids: Option<IndexSet<String>>,
691    /// Labels to filter the query results by.
692    pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
693    /// Whether to return active or inactive schedules only.
694    pub active: Option<bool>,
695    /// Schedules created after the given time, inclusive.
696    pub created_after: Option<SystemTime>,
697    /// Schedules created before the given time, exclusive.
698    pub created_before: Option<SystemTime>,
699}
700
701/// The order of jobs returned.
702#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
703pub enum ScheduleQueryOrder {
704    /// Order by the time the job was created in ascending order.
705    CreatedAtAsc,
706    /// Order by the time the job was created in descending order.
707    CreatedAtDesc,
708}
709
710/// The results of a schedule query.
711#[derive(Debug, Clone)]
712pub struct ScheduleQueryResult {
713    /// The schedules that satisfy the query.
714    pub schedules: Vec<ScheduleDetails>,
715    /// A cursor that can be used to continue the query.
716    pub cursor: Option<String>,
717    /// Whether there are more results to query.
718    pub has_more: bool,
719}
720
721/// Details of a schedule.
722///
723/// Associated jobs are not included on purpose
724/// as there can be many jobs associated with a schedule,
725/// additional queries can be made to get the jobs.
726#[derive(Debug, Clone, PartialEq, Eq)]
727pub struct ScheduleDetails {
728    /// The unique identifier of the schedule.
729    pub id: Uuid,
730    /// The time the schedule was created.
731    pub created_at: SystemTime,
732    /// Scheduling policy for the schedule.
733    pub job_timing_policy: ScheduleJobTimingPolicy,
734    /// Policy for new jobs created by the schedule.
735    pub job_creation_policy: ScheduleJobCreationPolicy,
736    /// Labels of the schedule.
737    pub labels: IndexMap<String, String>,
738    /// Whether the schedule is active.
739    pub active: bool,
740    /// Whether the schedule was cancelled.
741    pub cancelled: bool,
742    /// The time range for the schedule.
743    pub time_range: Option<ScheduleTimeRange>,
744    /// Arbitrary metadata in JSON format.
745    pub metadata_json: Option<String>,
746}
747
748/// Schedule label filter.
749#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
750pub enum ScheduleLabelFilterValue {
751    /// The label exists with any value.
752    Exists,
753    /// The label does not exist.
754    Equals(String),
755}
756
757/// The time range for a schedule.
758#[derive(Debug, Clone, Copy, PartialEq, Eq)]
759pub struct ScheduleTimeRange {
760    /// The schedule must not start before this time.
761    pub start: Option<SystemTime>,
762    /// The schedule must end before this time.
763    pub end: Option<SystemTime>,
764}
765
766impl ScheduleTimeRange {
767    /// Check if the given time is within the time range.
768    #[must_use]
769    pub fn contains(&self, time: SystemTime) -> bool {
770        if let Some(start) = self.start {
771            if time < start {
772                return false;
773            }
774        }
775        if let Some(end) = self.end {
776            if time >= end {
777                return false;
778            }
779        }
780        true
781    }
782
783    /// Check if the time range is valid.
784    #[must_use]
785    pub fn is_valid(&self) -> bool {
786        if let (Some(start), Some(end)) = (self.start, self.end) {
787            start < end
788        } else {
789            true
790        }
791    }
792}
793
794impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
795    fn from(value: ScheduleJobTimingPolicy) -> Self {
796        match value {
797            ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
798                job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
799                    common::v1::ScheduleJobTimingPolicyRepeat {
800                        interval: Some(policy.interval.try_into().unwrap()),
801                        immediate: policy.immediate,
802                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
803                            policy.missed_policy,
804                        )
805                        .into(),
806                    },
807                )),
808            },
809            ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
810                job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
811                    common::v1::ScheduleJobTimingPolicyCron {
812                        cron_expression: policy.cron_expression,
813                        immediate: policy.immediate,
814                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
815                            policy.missed_policy,
816                        )
817                        .into(),
818                    },
819                )),
820            },
821        }
822    }
823}
824
825impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
826    type Error = Status;
827
828    fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
829        let value = value.job_timing.ok_or_else(|| {
830            Status::invalid_argument("missing job timing policy in schedule definition")
831        })?;
832
833        match value {
834            schedule_job_timing_policy::JobTiming::Repeat(policy) => {
835                Ok(Self::Repeat(SchedulingPolicyRepeat {
836                    interval: policy
837                        .interval
838                        .ok_or_else(|| {
839                            Status::invalid_argument("missing interval in repeat policy")
840                        })?
841                        .try_into()
842                        .map_err(|_| Status::invalid_argument("invalid interval"))?,
843                    immediate: policy.immediate,
844                    missed_policy: policy.missed_time_policy().into(),
845                }))
846            }
847            schedule_job_timing_policy::JobTiming::Cron(policy) => {
848                Ok(Self::Cron(SchedulingPolicyCron {
849                    missed_policy: policy.missed_time_policy().into(),
850                    cron_expression: {
851                        // Validate the cron expression.
852                        cronexpr::parse_crontab(&policy.cron_expression).map_err(|err| {
853                            Status::invalid_argument(format!("invalid cron expression: {err}"))
854                        })?;
855
856                        policy.cron_expression
857                    },
858                    immediate: policy.immediate,
859                }))
860            }
861        }
862    }
863}
864
865impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
866    fn from(value: ScheduleJobCreationPolicy) -> Self {
867        match value {
868            ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
869                common::v1::ScheduleJobCreationPolicy {
870                    job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
871                        job_type_id: job_definition.job_type_id,
872                        input_payload_json: job_definition.input_payload_json,
873                        target_execution_time: None,
874                        retry_policy: Some(common::v1::JobRetryPolicy {
875                            retries: job_definition.retry_policy.retries,
876                        }),
877                        timeout_policy: Some(common::v1::JobTimeoutPolicy {
878                            timeout: job_definition
879                                .timeout_policy
880                                .timeout
881                                .and_then(|d| d.try_into().ok()),
882                            base_time: match job_definition.timeout_policy.base_time {
883                                JobTimeoutBaseTime::StartTime => {
884                                    common::v1::JobTimeoutBaseTime::StartTime.into()
885                                }
886                                JobTimeoutBaseTime::TargetExecutionTime => {
887                                    common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
888                                }
889                            },
890                        }),
891                        labels: job_definition
892                            .labels
893                            .into_iter()
894                            .map(|(key, value)| common::v1::JobLabel { key, value })
895                            .collect(),
896                        metadata_json: None,
897                    })),
898                }
899            }
900        }
901    }
902}
903
904impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
905    type Error = Status;
906
907    fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
908        let value = value.job_creation.ok_or_else(|| {
909            Status::invalid_argument("missing job creation policy in schedule definition")
910        })?;
911
912        match value {
913            JobCreation::JobDefinition(job_definition) => {
914                Ok(Self::JobDefinition(ScheduleNewJobDefinition {
915                    job_type_id: job_definition.job_type_id,
916                    input_payload_json: job_definition.input_payload_json,
917                    timeout_policy: job_definition
918                        .timeout_policy
919                        .map(Into::into)
920                        .unwrap_or_default(),
921                    retry_policy: job_definition
922                        .retry_policy
923                        .map(Into::into)
924                        .unwrap_or_default(),
925                    labels: job_definition
926                        .labels
927                        .into_iter()
928                        .map(|label| (label.key, label.value))
929                        .collect(),
930                }))
931            }
932        }
933    }
934}
935
936impl From<ScheduleTimeRange> for TimeRange {
937    fn from(value: ScheduleTimeRange) -> Self {
938        Self {
939            start: value.start.map(Into::into),
940            end: value.end.map(Into::into),
941        }
942    }
943}
944
945impl TryFrom<TimeRange> for ScheduleTimeRange {
946    type Error = Status;
947
948    fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
949        Ok(Self {
950            start: value
951                .start
952                .map(SystemTime::try_from)
953                .transpose()
954                .map_err(|error| {
955                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
956                })?,
957            end: value
958                .end
959                .map(SystemTime::try_from)
960                .transpose()
961                .map_err(|error| {
962                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
963                })?,
964        })
965    }
966}
967
968impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
969    fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
970        match value {
971            common::v1::ScheduleMissedTimePolicy::Skip
972            | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
973            common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
974        }
975    }
976}
977
978impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
979    fn from(value: ScheduleMissedTimePolicy) -> Self {
980        match value {
981            ScheduleMissedTimePolicy::Skip => Self::Skip,
982            ScheduleMissedTimePolicy::Create => Self::Create,
983        }
984    }
985}
986
987impl From<server::v1::JobQueryOrder> for JobQueryOrder {
988    fn from(value: server::v1::JobQueryOrder) -> Self {
989        match value {
990            server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
991            server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
992                Self::CreatedAtDesc
993            }
994            server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
995            server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
996        }
997    }
998}
999
1000impl From<JobDetails> for Job {
1001    fn from(job: JobDetails) -> Self {
1002        Job {
1003            id: job.id.to_string(),
1004            schedule_id: job.schedule_id.map(|t| t.to_string()),
1005            cancelled: job.cancelled,
1006            active: job.active,
1007            definition: Some(JobDefinition {
1008                job_type_id: job.job_type_id,
1009                input_payload_json: job.input_payload_json,
1010                target_execution_time: Some(job.target_execution_time.into()),
1011                retry_policy: Some(common::v1::JobRetryPolicy {
1012                    retries: job.retry_policy.retries,
1013                }),
1014                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1015                    timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1016                    base_time: match job.timeout_policy.base_time {
1017                        JobTimeoutBaseTime::StartTime => {
1018                            common::v1::JobTimeoutBaseTime::StartTime.into()
1019                        }
1020                        JobTimeoutBaseTime::TargetExecutionTime => {
1021                            common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1022                        }
1023                    },
1024                }),
1025                labels: job
1026                    .labels
1027                    .into_iter()
1028                    .map(|(key, value)| common::v1::JobLabel { key, value })
1029                    .collect(),
1030                metadata_json: job.metadata_json,
1031            }),
1032            created_at: Some(job.created_at.into()),
1033            executions: job.executions.into_iter().map(Into::into).collect(),
1034        }
1035    }
1036}
1037
1038impl From<ExecutionDetails> for server::v1::JobExecution {
1039    fn from(value: ExecutionDetails) -> Self {
1040        Self {
1041            id: value.id.to_string(),
1042            job_id: value.job_id.to_string(),
1043            executor_id: value.executor_id.map(|t| t.to_string()),
1044            status: match value.status {
1045                JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1046                JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1047                JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1048                JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1049                JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1050                JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1051            }
1052            .into(),
1053            created_at: Some(value.created_at.into()),
1054            ready_at: value.ready_at.map(Into::into),
1055            assigned_at: value.assigned_at.map(Into::into),
1056            started_at: value.started_at.map(Into::into),
1057            succeeded_at: value.succeeded_at.map(Into::into),
1058            failed_at: value.failed_at.map(Into::into),
1059            output_payload_json: value.output_payload_json,
1060            failure_reason: value.failure_reason,
1061        }
1062    }
1063}
1064
1065impl From<JobType> for ora_proto::common::v1::JobType {
1066    fn from(value: JobType) -> Self {
1067        Self {
1068            id: value.id,
1069            name: if value.name.is_empty() {
1070                None
1071            } else {
1072                Some(value.name)
1073            },
1074            description: if value.description.is_empty() {
1075                None
1076            } else {
1077                Some(value.description)
1078            },
1079            input_schema_json: value.input_schema_json,
1080            output_schema_json: value.output_schema_json,
1081        }
1082    }
1083}
1084
1085impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1086    type Error = Status;
1087
1088    fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1089        Ok(Self {
1090            execution_status: {
1091                let status = filter
1092                    .status()
1093                    .filter_map(|status| match status {
1094                        server::v1::JobExecutionStatus::Unspecified => None,
1095                        server::v1::JobExecutionStatus::Pending => {
1096                            Some(JobExecutionStatus::Pending)
1097                        }
1098                        server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1099                        server::v1::JobExecutionStatus::Assigned => {
1100                            Some(JobExecutionStatus::Assigned)
1101                        }
1102                        server::v1::JobExecutionStatus::Running => {
1103                            Some(JobExecutionStatus::Running)
1104                        }
1105                        server::v1::JobExecutionStatus::Succeeded => {
1106                            Some(JobExecutionStatus::Succeeded)
1107                        }
1108                        server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1109                    })
1110                    .collect::<IndexSet<_>>();
1111
1112                if status.is_empty() {
1113                    None
1114                } else {
1115                    Some(status)
1116                }
1117            },
1118            job_ids: if filter.job_ids.is_empty() {
1119                None
1120            } else {
1121                Some(
1122                    filter
1123                        .job_ids
1124                        .iter()
1125                        .map(|f| {
1126                            f.parse().map_err(|err| {
1127                                Status::invalid_argument(format!("invalid job ID: {err}"))
1128                            })
1129                        })
1130                        .collect::<Result<_, _>>()?,
1131                )
1132            },
1133            job_type_ids: if filter.job_type_ids.is_empty() {
1134                None
1135            } else {
1136                Some(filter.job_type_ids.into_iter().collect())
1137            },
1138            execution_ids: if filter.execution_ids.is_empty() {
1139                None
1140            } else {
1141                Some(
1142                    filter
1143                        .execution_ids
1144                        .iter()
1145                        .map(|f| {
1146                            f.parse().map_err(|err| {
1147                                Status::invalid_argument(format!("invalid execution ID: {err}"))
1148                            })
1149                        })
1150                        .collect::<Result<_, _>>()?,
1151                )
1152            },
1153            schedule_ids: if filter.schedule_ids.is_empty() {
1154                None
1155            } else {
1156                Some(
1157                    filter
1158                        .schedule_ids
1159                        .iter()
1160                        .map(|f| {
1161                            f.parse().map_err(|err| {
1162                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1163                            })
1164                        })
1165                        .collect::<Result<_, _>>()?,
1166                )
1167            },
1168            labels: if filter.labels.is_empty() {
1169                None
1170            } else {
1171                Some(
1172                    filter
1173                        .labels
1174                        .into_iter()
1175                        .filter_map(|label| {
1176                            Some((
1177                                label.key,
1178                                match label.value? {
1179                                    server::v1::job_label_filter::Value::Exists(_) => {
1180                                        JobLabelFilterValue::Exists
1181                                    }
1182                                    server::v1::job_label_filter::Value::Equals(value) => {
1183                                        JobLabelFilterValue::Equals(value)
1184                                    }
1185                                },
1186                            ))
1187                        })
1188                        .collect(),
1189                )
1190            },
1191            active: filter.active,
1192            created_after: filter
1193                .created_at
1194                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1195            created_before: filter
1196                .created_at
1197                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1198            target_execution_time_after: filter
1199                .target_execution_time
1200                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1201            target_execution_time_before: filter
1202                .target_execution_time
1203                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1204        })
1205    }
1206}
1207
1208impl From<ScheduleDetails> for server::v1::Schedule {
1209    fn from(value: ScheduleDetails) -> Self {
1210        Self {
1211            id: value.id.to_string(),
1212            definition: Some(common::v1::ScheduleDefinition {
1213                job_timing_policy: Some(match value.job_timing_policy {
1214                    ScheduleJobTimingPolicy::Repeat(policy) => {
1215                        common::v1::ScheduleJobTimingPolicy {
1216                            job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1217                                common::v1::ScheduleJobTimingPolicyRepeat {
1218                                    interval: policy.interval.try_into().ok(),
1219                                    immediate: policy.immediate,
1220                                    missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1221                                        policy.missed_policy,
1222                                    )
1223                                    .into(),
1224                                },
1225                            )),
1226                        }
1227                    }
1228                    ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1229                        job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1230                            common::v1::ScheduleJobTimingPolicyCron {
1231                                cron_expression: policy.cron_expression,
1232                                immediate: policy.immediate,
1233                                missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1234                                    policy.missed_policy,
1235                                )
1236                                .into(),
1237                            },
1238                        )),
1239                    },
1240                }),
1241                job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1242                    job_creation: Some(match value.job_creation_policy {
1243                        ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1244                            JobCreation::JobDefinition(common::v1::JobDefinition {
1245                                job_type_id: job_definition.job_type_id,
1246                                input_payload_json: job_definition.input_payload_json,
1247                                target_execution_time: None,
1248                                retry_policy: Some(common::v1::JobRetryPolicy {
1249                                    retries: job_definition.retry_policy.retries,
1250                                }),
1251                                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1252                                    timeout: job_definition
1253                                        .timeout_policy
1254                                        .timeout
1255                                        .and_then(|d| d.try_into().ok()),
1256                                    base_time: match job_definition.timeout_policy.base_time {
1257                                        JobTimeoutBaseTime::StartTime => {
1258                                            common::v1::JobTimeoutBaseTime::StartTime.into()
1259                                        }
1260                                        JobTimeoutBaseTime::TargetExecutionTime => {
1261                                            common::v1::JobTimeoutBaseTime::TargetExecutionTime
1262                                                .into()
1263                                        }
1264                                    },
1265                                }),
1266                                labels: job_definition
1267                                    .labels
1268                                    .into_iter()
1269                                    .map(|(key, value)| common::v1::JobLabel { key, value })
1270                                    .collect(),
1271                                metadata_json: None,
1272                            })
1273                        }
1274                    }),
1275                }),
1276                time_range: value.time_range.map(|range| TimeRange {
1277                    start: range.start.map(Into::into),
1278                    end: range.end.map(Into::into),
1279                }),
1280                labels: value
1281                    .labels
1282                    .into_iter()
1283                    .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1284                    .collect(),
1285                metadata_json: value.metadata_json,
1286            }),
1287            created_at: Some(value.created_at.into()),
1288            active: value.active,
1289            cancelled: value.cancelled,
1290        }
1291    }
1292}
1293
1294impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1295    fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1296        match value {
1297            server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1298            server::v1::ScheduleQueryOrder::CreatedAtDesc
1299            | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1300        }
1301    }
1302}
1303
1304impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1305    type Error = Status;
1306
1307    fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1308        Ok(Self {
1309            schedule_ids: if value.schedule_ids.is_empty() {
1310                None
1311            } else {
1312                Some(
1313                    value
1314                        .schedule_ids
1315                        .iter()
1316                        .map(|f| {
1317                            f.parse().map_err(|err| {
1318                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1319                            })
1320                        })
1321                        .collect::<Result<_, _>>()?,
1322                )
1323            },
1324            job_ids: if value.job_ids.is_empty() {
1325                None
1326            } else {
1327                Some(
1328                    value
1329                        .job_ids
1330                        .iter()
1331                        .map(|f| {
1332                            f.parse().map_err(|err| {
1333                                Status::invalid_argument(format!("invalid job ID: {err}"))
1334                            })
1335                        })
1336                        .collect::<Result<_, _>>()?,
1337                )
1338            },
1339            job_type_ids: if value.job_type_ids.is_empty() {
1340                None
1341            } else {
1342                Some(value.job_type_ids.into_iter().collect())
1343            },
1344            labels: if value.labels.is_empty() {
1345                None
1346            } else {
1347                Some(
1348                    value
1349                        .labels
1350                        .into_iter()
1351                        .filter_map(|label| {
1352                            Some((
1353                                label.key,
1354                                match label.value? {
1355                                    server::v1::schedule_label_filter::Value::Exists(_) => {
1356                                        ScheduleLabelFilterValue::Exists
1357                                    }
1358                                    server::v1::schedule_label_filter::Value::Equals(value) => {
1359                                        ScheduleLabelFilterValue::Equals(value)
1360                                    }
1361                                },
1362                            ))
1363                        })
1364                        .collect(),
1365                )
1366            },
1367            active: value.active,
1368            created_after: value
1369                .created_at
1370                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1371            created_before: value
1372                .created_at
1373                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1374        })
1375    }
1376}
1377
1378impl From<JobType> for snapshot::v1::ExportedJobType {
1379    fn from(job_type: JobType) -> Self {
1380        snapshot::v1::ExportedJobType {
1381            job_type: Some(common::v1::JobType {
1382                id: job_type.id,
1383                name: Some(job_type.name),
1384                description: Some(job_type.description),
1385                input_schema_json: job_type.input_schema_json,
1386                output_schema_json: job_type.output_schema_json,
1387            }),
1388        }
1389    }
1390}
1391
1392impl From<snapshot::v1::ExportedJobType> for JobType {
1393    fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1394        let job_type = job_type.job_type.unwrap();
1395
1396        Self {
1397            id: job_type.id,
1398            name: job_type.name.unwrap_or_default(),
1399            description: job_type.description.unwrap_or_default(),
1400            input_schema_json: job_type.input_schema_json,
1401            output_schema_json: job_type.output_schema_json,
1402        }
1403    }
1404}